Changeset 1620

Show
Ignore:
Timestamp:
04/18/08 16:29:00 (3 weeks ago)
Author:
till
Message:

nebula
- semaphores are used to wakeup worker threads

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • nebula/trunk/src/classify.c

    r1619 r1620  
    4242        int                     rv; 
    4343        qelem                   *elem; 
    44         struct timeval          timeout; 
    45         fd_set                  rfds; 
    46         u_char                  buf; 
    4744 
    4845        for (;;) { 
    49                 FD_ZERO(&rfds); 
    50                 FD_SET(cpipe[0], &rfds); 
    51  
    52                 timeout.tv_sec  = 10; 
    53                 timeout.tv_usec = 0; 
    54  
    55                 // this thread just processes a clustering job queue 
    56                 // it reads wakeup calls from a pipe 
    57                 switch (select(cpipe[0]+1, &rfds, NULL, NULL, &timeout)) { 
    58                 case -1: 
    59                         fprintf(stderr, "Error - select() failed: %s.\n", strerror(errno)); 
     46                if (sem_wait(&sessions_semaphore) == -1) { 
     47                        fprintf(stderr, "Error - Unable to decrement semaphore: %s.\n", strerror(errno)); 
    6048                        exit(EXIT_FAILURE); 
    61                 case 0: 
    62                         break; 
    63                 default: 
    64                         if (FD_ISSET(cpipe[0], &rfds)) { 
    65  
    66                                 if (read(cpipe[0], &buf, 1) != 1) { 
    67                                         fprintf(stderr, "Error - Unable to read from pipe: %s.\n", strerror(errno)); 
    68                                         exit(EXIT_FAILURE); 
    69                                 } 
    70  
    71                                 pthread_mutex_lock(&classifyq_mutex); 
    72                                 elem = queue_cuthead(classifyq); 
    73                                 pthread_mutex_unlock(&classifyq_mutex); 
    74  
    75                                 while (elem) { 
    76                                         // call classification function 
    77                                         if ((rv = classify((submission *) elem->data)) == -1) { 
    78                                                 fprintf(stderr, "Error - Unable to classify submission.\n"); 
    79                                                 free(elem); 
    80                                                 return((void *) -1); 
    81                                         } else if (rv == 0) { 
    82                                                 // known attack, do nothing 
    83                                                 free(elem); 
    84                                                 return((void *) 0); 
    85                                         } else { 
    86                                                 // new attack 
    87                                                 free(elem->data); 
    88                                                 free(elem); 
    89                                         } 
    90  
    91                                         pthread_mutex_lock(&classifyq_mutex); 
    92                                         elem = queue_cuthead(classifyq); 
    93                                         pthread_mutex_unlock(&classifyq_mutex); 
    94                                 } 
    95                         } 
    96                         break; 
     49                } 
     50 
     51                pthread_mutex_lock(&classifyq_mutex); 
     52                elem = queue_cuthead(classifyq); 
     53                pthread_mutex_unlock(&classifyq_mutex); 
     54 
     55                if (elem) { 
     56                        // call classification function 
     57                        if ((rv = classify((submission *) elem->data)) == -1) { 
     58                                fprintf(stderr, "Error - Unable to classify submission.\n"); 
     59                                free(elem); 
     60                                return((void *) -1); 
     61                        } else if (rv == 0) { 
     62                                // known attack, do nothing 
     63                                free(elem); 
     64                                return((void *) 0); 
     65                        } else { 
     66                                // new attack 
     67                                free(elem->data); 
     68                                free(elem); 
     69                        } 
    9770                } 
    9871        } 
     
    293266                for (jqelem = siggenq->head; jqelem && jqelem->data != ((hash*)t->data)->cl; jqelem = jqelem->next); 
    294267 
    295                 // queue cluster for signature generation 
     268                // generate signature for this cluster 
    296269                if (!jqelem || jqelem->data != ((hash*)t->data)->cl) { 
    297270                        // append cluster to signature generation job queue 
     
    300273                                exit(EXIT_FAILURE); 
    301274                        }  
    302                         if (write(gpipe[1], "x", sizeof(u_char)) == -1) { 
    303                                 fprintf(stderr, "Error - Unable to write to pipe: %s.\n", strerror(errno)); 
     275 
     276                        // wake up a signature generation thread 
     277                        if (sem_post(&siggen_semaphore) == -1) { 
     278                                fprintf(stderr, "Error - Unable to increment semaphore: %s.\n", strerror(errno)); 
    304279                                exit(EXIT_FAILURE); 
    305280                        } 
  • nebula/trunk/src/nebula.c

    r1619 r1620  
    7979int main(int argc, char *argv[]) { 
    8080        int             i, qsize, port, listen_fd, rv, gthread_no;  
    81         u_char          daemonize, bell
     81        u_char          daemonize
    8282        char            option; 
    8383        pthread_t       ntid; 
     
    104104 
    105105        lock_mutex              = 1;    // if this is set, threads will lock mutexes 
    106  
    107         bell                    = 1;    // wakeup bell 
    108106 
    109107 
     
    139137        } 
    140138 
     139 
     140        // initialize semaphore 
     141        if ((sem_init(&sessions_semaphore, 0, 0) == -1) || 
     142            (sem_init(&sessions_semaphore, 0, 0) == -1)) { 
     143                fprintf(stderr, "Error - Unable to initialize semaphore: %m.\n"); 
     144                exit(EXIT_FAILURE); 
     145        } 
     146 
    141147        printf("\n  Nebula %s Copyright (C) 2007-2008 Tillmann Werner <tillmann.werner@gmx.de>\n\n", VERSION); 
    142148 
     
    246252 
    247253        // spawn classification thread 
    248         // the main thread writes to a pipe to inform the classification thread about pending jobs 
    249  
    250         if (pipe(cpipe) == -1) { 
    251                 fprintf(stderr, "Error - Unable to create interthread communication pipe: %s.\n", strerror(errno)); 
    252                 exit(EXIT_FAILURE); 
    253         } 
    254  
    255254        if (pthread_attr_init(&ptattr) || pthread_attr_setdetachstate(&ptattr, PTHREAD_CREATE_DETACHED)) { 
    256255                fprintf(stderr, "Error - Unable to initialize thread attributes: %s.\n", strerror(errno)); 
     
    266265 
    267266        // spawn signature generation thread(s) 
    268         // the main thread writes to a pipe to inform the generation thread(s) about pending jobs 
    269  
    270267        for (; gthread_no; gthread_no--) { 
    271268                if (verbose > 1) printf("  Signature generation thread spawned.\n"); 
    272                 if (pipe(gpipe) == -1) { 
    273                         fprintf(stderr, "Error - Unable to create interthread communication pipe: %s.\n", strerror(errno)); 
    274                         exit(EXIT_FAILURE); 
    275                 } 
    276269                if (pthread_attr_init(&ptattr) || pthread_attr_setdetachstate(&ptattr, PTHREAD_CREATE_DETACHED)) { 
    277270                        fprintf(stderr, "Error - Unable to initialize thread attributes: %s.\n", strerror(errno)); 
     
    419412                                                pthread_mutex_unlock(&classifyq_mutex); 
    420413 
    421                                                 // now send a 'wakeup byte' to the classification thread 
    422                                                 if (write(cpipe[1], &bell, sizeof(u_char)) == -1) { 
    423                                                         fprintf(stderr, "Error - Unable to write to pipe: %s.\n", strerror(errno)); 
     414                                                // wake up a classification thread 
     415                                                if (sem_post(&sessions_semaphore) == -1) { 
     416                                                        fprintf(stderr, "Error - Unable to increment semaphore: %s.\n", strerror(errno)); 
    424417                                                        exit(EXIT_FAILURE); 
    425418                                                } 
  • nebula/trunk/src/nebula.h

    r1618 r1620  
    2828 
    2929#include <pthread.h> 
     30#include <semaphore.h> 
    3031 
    3132#include "avl.h" 
     
    4344nfds_t                  pollfd_set_size;                // size of dynamically adjusted set 
    4445 
    45 int                     lock_mutex, cpipe[2], gpipe[2]
     46int                     lock_mutex
    4647 
    4748u_char                  verbose; 
     
    7273pthread_rwlock_t        sighash_trie_lock; 
    7374 
     75sem_t                   sessions_semaphore; 
     76sem_t                   siggen_semaphore; 
     77 
    7478#endif 
  • nebula/trunk/src/sig.c

    r1618 r1620  
    358358void *pt_pollgq(void *param) { 
    359359        qelem                   *elem; 
    360         struct timeval          timeout; 
    361         fd_set                  rfds; 
    362         u_char                  buf; 
    363360 
    364361        for (;;) { 
    365                 FD_ZERO(&rfds); 
    366                 FD_SET(gpipe[0], &rfds); 
    367  
    368                 timeout.tv_sec  = 10; 
    369                 timeout.tv_usec = 0; 
    370  
    371                 switch (select(gpipe[0]+1, &rfds, NULL, NULL, &timeout)) { 
    372                 case -1: 
    373                         fprintf(stderr, "Error - select() failed: %s.\n", strerror(errno)); 
     362                if (sem_wait(&siggen_semaphore) == -1) { 
     363                        fprintf(stderr, "Error - Unable to decrement semaphore: %s.\n", strerror(errno)); 
    374364                        exit(EXIT_FAILURE); 
    375                 case 0: 
    376                         break; 
    377                 default: 
    378                         if (FD_ISSET(gpipe[0], &rfds)) { 
    379                                 if (read(gpipe[0], &buf, 1) != 1) { 
    380                                         fprintf(stderr, "Error - Unable to read from pipe: %s.\n", strerror(errno)); 
    381                                         exit(EXIT_FAILURE); 
    382                                 } 
    383                         } 
    384                         pthread_mutex_lock(&siggenq_mutex); 
    385                         elem = queue_cuthead(siggenq); 
    386                         pthread_mutex_unlock(&siggenq_mutex); 
    387  
    388                         if (elem != NULL) { 
    389                                 gensig(elem->data); 
    390                                 free(elem); 
    391                         } 
    392  
    393                         break; 
     365                } 
     366 
     367                pthread_mutex_lock(&siggenq_mutex); 
     368                elem = queue_cuthead(siggenq); 
     369                pthread_mutex_unlock(&siggenq_mutex); 
     370 
     371                if (elem) { 
     372                        gensig(elem->data); 
     373                        free(elem); 
    394374                } 
    395375        } 
  • nebula/trunk/src/signals.c

    r1618 r1620  
    9595        u_char                  i; 
    9696        struct sigaction        s_action; 
    97 /* 
    9897        static int              termsigs[] = { 
    9998#ifdef HAVE_SIGBUS 
     
    104103                SIGINT, 
    105104                SIGQUIT, 
    106 //            SIGSEGV, 
     105              SIGSEGV, 
    107106                SIGTERM 
    108107        }; 
    109 */ 
    110108 
    111109 
     
    124122 
    125123 
    126 /* 
    127124        // install handler for signals which terminate the process  
    128125        memset(&s_action, 0, sizeof(struct sigaction)); 
     
    137134                } 
    138135        } 
    139 */ 
    140136 
    141137        return; 
  • nebula/trunk/src/util.c

    r1618 r1620  
    2020 
    2121#include <dirent.h> 
     22#include <errno.h> 
    2223#include <fcntl.h> 
    2324#include <math.h> 
     
    126127        trie_delete(spamsum_trie.childlist, spamsum_trie.childlist_len, NULL); 
    127128        trie_delete(sighash_trie.childlist, sighash_trie.childlist_len, NULL); 
     129 
     130        // destroy semaphores 
     131        if (sem_getvalue(&sessions_semaphore, &i) == -1) { 
     132                fprintf(stderr, "Error - Unable to get semaphore value: %s.\n", strerror(errno)); 
     133                exit(EXIT_FAILURE); 
     134        } 
     135        for (; i; i--) sem_wait(&sessions_semaphore); 
     136        if (sem_destroy(&sessions_semaphore) == -1) { 
     137                fprintf(stderr, "Error - Unable to destroy semaphore: %s.\n", strerror(errno)); 
     138                exit(EXIT_FAILURE); 
     139        } 
    128140         
     141        if (sem_getvalue(&siggen_semaphore, &i) == -1) { 
     142                fprintf(stderr, "Error - Unable to get semaphore value: %s.\n", strerror(errno)); 
     143                exit(EXIT_FAILURE); 
     144        } 
     145        for (; i; i--) sem_wait(&siggen_semaphore); 
     146        if (sem_destroy(&siggen_semaphore) == -1) { 
     147                fprintf(stderr, "Error - Unable to destroy semaphore: %s.\n", strerror(errno)); 
     148                exit(EXIT_FAILURE); 
     149        } 
     150 
     151        // destroy mutexes and rwlocks 
    129152        pthread_mutex_destroy(&siggen_mutex); 
    130153        pthread_mutex_destroy(&sigwrite_mutex);