Changeset 1566

Show
Ignore:
Timestamp:
02/24/08 17:07:47 (8 months ago)
Author:
till
Message:

nebula
- synchronized threading
- dynamic cluster size thresholds

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • nebula/trunk/src/classify.c

    r1563 r1566  
    3434#include <ctype.h> 
    3535 
     36 
    3637// pthread wrapper function 
    3738void *pt_classify(void *s) { 
    38         int rv; 
     39        int     rv; 
     40 
     41        // this thread needs to unlock the sessions mutex 
     42        // so that the session ID can be reused in the main thread 
     43        pthread_mutex_unlock(&sessions_mutex); 
    3944 
    4045        // call classification function 
    41         if ((rv = classify((submission *) s)) == -1) { 
     46        if ((rv = classify(s)) == -1) { 
    4247                fprintf(stderr, "Error - Unable to classify submission.\n"); 
    4348                return((void *) -1); 
    4449        } else if (rv == 0) { 
    4550                // known attack, do nothing 
     51                return((void *) 0); 
    4652        } else { 
    47                 // new attack, keep hash pointer by NULL'ing session hash pointer 
    48                 ((submission *) s)->md5sum = NULL
    49         } 
    50         return((void *) 0); 
     53                // new attack 
     54                free(s)
     55        } 
     56        return((void *) 1); 
    5157} 
    5258 
     
    6066        double          score; 
    6167        pthread_t       ntid; 
     68        pthread_attr_t  ptattr; 
    6269 
    6370        tmpbuf  = NULL; 
    6471 
     72        if (!s) { 
     73                fprintf(stderr, "Error - Cannot classify empty session.\n"); 
     74                return(-1); 
     75        } 
     76 
    6577        pthread_rwlock_rdlock(&md5sum_trie_lock); 
    66         if ((t = trie_find_memstr(&md5sum_trie, (u_char *) s->md5sum, strlen(s->md5sum))) != NULL) { 
     78        t = trie_find_memstr(&md5sum_trie, (u_char *) s->md5sum, strlen(s->md5sum)); 
     79        pthread_rwlock_unlock(&md5sum_trie_lock); 
     80 
     81        if (t != NULL) { 
    6782                // md5sum is already in trie 
    6883                ((hash*)t->data)->cnt++; 
    69                 pthread_rwlock_unlock(&md5sum_trie_lock); 
    7084                 
    7185                if (verbose) printf("    md5sum is %s (%u instances)\n", ((hash*)t->data)->md5sum, ((hash*)t->data)->cnt); 
     
    7488        } else { 
    7589                // md5sum not in trie, create new element 
    76                 pthread_rwlock_unlock(&md5sum_trie_lock); 
    7790                pthread_rwlock_wrlock(&md5sum_trie_lock); 
    7891                t = trie_memins(&md5sum_trie, (u_char *) s->md5sum, strlen(s->md5sum), NULL); 
     92                pthread_rwlock_unlock(&md5sum_trie_lock); 
     93 
    7994                if ((t->data = calloc(1, sizeof(hash))) == NULL) { 
    8095                        fprintf(stderr, "Error - Unable to allocate memory: %m.\n"); 
     
    90105                memcpy(((hash*)t->data)->submission, s, sizeof(submission)); 
    91106                ((hash*)t->data)->cnt++; 
    92                 pthread_rwlock_unlock(&md5sum_trie_lock); 
    93  
    94 /* 
    95                 // set filename 
    96                 if ((((hash*)t->data)->filename = strdup(filename)) == NULL) { 
    97                         fprintf(stderr, "Error - Unable to allocate memory: %m.\n"); 
    98                         exit(EXIT_FAILURE); 
    99                 } 
    100 */ 
    101107 
    102108                // set spamsum hash 
     
    106112                } 
    107113                if (verbose) printf("    md5sum is %s (%u instances)\n", ((hash*)t->data)->md5sum, ((hash*)t->data)->cnt); 
     114 
     115 
     116                Q_WLOCK(&clusterq->lock); 
    108117 
    109118                // connect all clusters within range 
     
    113122                                        if (!((hash*)t->data)->cl) { 
    114123                                                add_entry_to_cluster((cluster *)cur_cqelem->data, (hash*)t->data); 
     124                                                if (verbose) printf("  cluster has now %u elements.\n", ((hash*)t->data)->cl->cnt); 
    115125                                                break; 
    116126                                        } else { 
     
    127137                        if ((score = spamsum_match(((hash*)t->data)->spamsum, ((hash*)cur_hqelem->data)->spamsum)) >= cluster_radius) { 
    128138                                // unlink match from outlier list 
     139 
    129140                                tmp_hqelem = cur_hqelem; 
    130141                                cur_hqelem = cur_hqelem->next; 
     
    136147                                        // add other outliers to cluster 
    137148                                        if (add_entry_to_cluster(((hash*)t->data)->cl, tmp_hash) == NULL) { 
     149 
    138150                                                pthread_rwlock_wrlock(&md5sum_trie_lock); 
     151                                                trie_del_memstr(&md5sum_trie, (u_char *) tmp_hash->md5sum, strlen(tmp_hash->md5sum)); 
     152                                                pthread_rwlock_unlock(&md5sum_trie_lock); 
     153 
    139154                                                pthread_rwlock_wrlock(&spamsum_trie_lock); 
    140  
    141                                                 trie_del_memstr(&md5sum_trie, (u_char *) tmp_hash->md5sum, strlen(tmp_hash->md5sum)); 
    142155                                                trie_del_memstr(&spamsum_trie, (u_char *) tmp_hash->spamsum, strlen(tmp_hash->spamsum)); 
     156                                                pthread_rwlock_unlock(&spamsum_trie_lock); 
     157 
    143158                                                hash_free(tmp_hash); 
    144159 
    145                                                 pthread_rwlock_unlock(&md5sum_trie_lock); 
    146                                                 pthread_rwlock_unlock(&spamsum_trie_lock); 
    147  
    148                                         } 
     160 
     161                                        } else if (verbose) printf("  cluster has now %u elements.\n", ((hash*)t->data)->cl->cnt); 
    149162                                } else { 
    150163                                        // create new cluster of two outliers 
     
    154167                                                /* first remove hashes from tries */ 
    155168                                                for (tmp_hqelem = ((cluster *)tmp_cqelem->data)->hq->head; tmp_hqelem; tmp_hqelem=tmp_hqelem->next) { 
     169 
    156170                                                        pthread_rwlock_wrlock(&md5sum_trie_lock); 
    157171                                                        trie_del_memstr(&md5sum_trie, (u_char *) ((hash *)tmp_hqelem->data)->md5sum, 
     
    171185                                         * we need this to be able to unlink a cluster from the queue */ 
    172186                                        ((cluster *) clusterq->head->data)->parent = clusterq->head; 
     187 
     188                                        if (verbose) printf("  cluster created.\n"); 
    173189                                } 
    174190                        } 
    175191                        if (!cur_hqelem) break; 
    176192                } 
     193 
     194 
     195                Q_ULOCK(&clusterq->lock); 
     196 
    177197 
    178198                         
     
    191211                                trie_del_memstr(&spamsum_trie, (u_char *) tmp_hash->spamsum, strlen(tmp_hash->spamsum)); 
    192212                                pthread_rwlock_unlock(&spamsum_trie_lock); 
     213 
    193214                                hash_free(tmp_hash); 
    194215                        } 
    195216                        queue_ins(outlierq, t->data, outlierq_max); 
     217 
     218                        if (verbose) printf("  input added to outlier queue.\n"); 
    196219                } 
    197220        } 
    198221 
    199222        // check for signature generation criteria here 
    200         if ((((hash*)t->data)->cl) && ((hash*)t->data)->cl->cnt > 5) { 
    201                         printf("[=] cluster hit size threshold, generating signature.\n"); 
    202                         if (pthread_create(&ntid, NULL, pt_siggen, (void *) ((hash*)t->data)->cl)) { 
    203                                 fprintf(stderr, "Error - Cannot create signature generation thread: %s.\n", strerror(errno)); 
    204                                 exit(EXIT_FAILURE); 
    205                         } 
     223        if ((((hash*)t->data)->cl) && ((hash*)t->data)->cl->cnt > ((hash*)t->data)->cl->threshold) { 
     224                printf("[=] cluster size (%u) hit threshold (%lu), generating signature.\n", 
     225                        ((hash*)t->data)->cl->cnt, ((hash*)t->data)->cl->threshold); 
     226 
     227                // set threshold to current size plus 1 
     228                ((hash*)t->data)->cl->threshold = ((hash*)t->data)->cl->cnt + 1; 
     229 
     230                if (pthread_attr_init(&ptattr) || 
     231                    pthread_attr_setdetachstate(&ptattr, PTHREAD_CREATE_DETACHED)) { 
     232                        fprintf(stderr, "Error - Unable to initialize thread attributes: %s.\n", strerror(errno)); 
     233                        exit(EXIT_FAILURE); 
     234                } 
     235                if (pthread_create(&ntid, NULL, pt_siggen, (void *) ((hash*)t->data)->cl)) { 
     236                        fprintf(stderr, "Error - Cannot create signature generation thread: %s.\n", strerror(errno)); 
     237                        exit(EXIT_FAILURE); 
     238                } 
     239                pthread_attr_destroy(&ptattr); 
    206240        } 
    207241 
  • nebula/trunk/src/cluster.c

    r1558 r1566  
    3737        } 
    3838 
     39        if (pthread_rwlock_init(&new->lock, NULL) != 0) { 
     40                fprintf(stderr, "Error - Unable to initialize thread lock: %m.\n"); 
     41                exit(EXIT_FAILURE); 
     42        } 
     43 
    3944        /* create element queue */ 
    4045        if ((new->hq = calloc(1, sizeof(queue))) == NULL) { 
     
    4247                exit(EXIT_FAILURE); 
    4348        } 
     49 
     50        // initialize size threshold 
     51        new->threshold = initial_threshold; 
    4452 
    4553        return(new); 
     
    5159        if (!cl) return; 
    5260 
     61        if (lock_mutex) CL_WLOCK(cl); 
    5362        queue_free(((cluster *)cl)->hq, hash_free); 
     63        if (lock_mutex) CL_ULOCK(cl); 
    5464        free(cl); 
    5565 
     
    6171        qelem   *new; 
    6272 
     73        CL_WLOCK(cl); 
    6374        if (cl->hq->size < clusterhashq_max) { 
    6475                // set cluster pointer in hash struct 
     
    7586                return(NULL); 
    7687        } 
     88        CL_ULOCK(cl); 
    7789 
    7890        return(cl); 
     
    93105                qelem   *entry; 
    94106 
     107                CL_WLOCK(dst); 
     108                CL_WLOCK(src); 
     109 
    95110                // concatenate hash queues 
    96111                dst->hq->tail->next = src->hq->head; 
     
    98113                dst->hq->size += src->hq->size; 
    99114 
     115                // add thresholds 
     116                dst->threshold += max(src->threshold, src->cnt); 
     117 
    100118                // let elements of src point to dst 
    101119                for (entry = src->hq->head; entry; entry = entry->next) ((hash*)entry->data)->cl = dst; 
    102120                src->hq->head = src->hq->tail = NULL; 
     121 
     122                CL_ULOCK(dst); 
     123                CL_ULOCK(src); 
    103124 
    104125                // unlink and free old cluster 
  • nebula/trunk/src/cluster.h

    r1560 r1566  
    2626#endif 
    2727 
     28#include <pthread.h> 
    2829#include <stdlib.h> 
    2930 
     
    3233#include "queue.h" 
    3334 
     35 
     36#define CL_WLOCK(x)     pthread_rwlock_wrlock(&((cluster *)x)->lock); 
     37#define CL_RLOCK(x)     pthread_rwlock_rdlock(&((cluster *)x)->lock); 
     38#define CL_ULOCK(x)     pthread_rwlock_unlock(&((cluster *)x)->lock); 
     39 
     40 
    3441/* this struct can be extended when using different metrics, 
    3542 * i.e. you may want to add a center when using the cosine measure 
    3643 * for ngram vectors */ 
    3744typedef struct cluster { 
    38         u_int32_t       cnt; 
    39         u_int32_t       sig_id; 
    40         u_int32_t       sig_rev; 
    41         void            *parent; 
    42         struct cluster  *prev; 
    43         struct cluster  *next; 
    44         queue           *hq; 
     45        pthread_rwlock_t        lock; 
     46        u_int32_t               cnt; 
     47        u_int32_t               sig_id; 
     48        u_int32_t               sig_rev; 
     49        unsigned long int       threshold; 
     50        void                    *parent; 
     51        struct cluster          *prev; 
     52        struct cluster          *next; 
     53        queue                   *hq; 
    4554} cluster; 
    4655 
  • nebula/trunk/src/nebula.c

    r1563 r1566  
    5151 
    5252 
    53 #define POLLFD_SET_SIZE 10 
    54  
    55  
    5653void usage(const char* progname, const int exit_val) { 
    5754        printf("Usage: %s " 
    5855                " -a <filename>\t append new snort signatures to this file\n" 
    59                 "\t\t-C <size>\t cluster queue size\n" 
     56                "\t\t -C <size>\t cluster queue size\n" 
    6057                "\t\t -c <similarity> cluster criteria (a similarity measure in percent)\n" 
    6158                "\t\t -d\t\t daemonize\n" 
     
    6764                "\t\t -r <snort pid>\t send a SIGHUP to this process ID after a new rule was generated\n" 
    6865                "\t\t -s <secret>\t secret string for use in submissions\n" 
     66                "\t\t -t <threshold>\t initial cluster element threshold\n" 
    6967                "\t\t -v\t\t be verbose\n" 
    7068                "\t\t\t\t [files ...]\n", 
     
    7472} 
    7573 
     74 
    7675int main(int argc, char *argv[]) { 
    7776        int             i, qsize, port, listen_fd, rv; 
    78         struct pollfd   pfdset[POLLFD_SET_SIZE]; 
    7977        u_char          daemonize; 
    8078        char            option; 
    81         submission      s[POLLFD_SET_SIZE-1], tmp_submission; 
    8279        pthread_t       ntid; 
    83  
    84  
    85         memset(pfdset, 0, sizeof(struct pollfd) * POLLFD_SET_SIZE); 
    86         memset(s, 0, sizeof(submission) * (POLLFD_SET_SIZE-1)); 
     80        submission      *tmp_submission, s[POLLFD_SET_SIZE]; 
     81        pthread_attr_t  ptattr; 
     82 
     83 
     84        memset(pfdset, -1, sizeof(struct pollfd) * (POLLFD_SET_SIZE+1)); 
     85        memset(s, 0, sizeof(submission) * POLLFD_SET_SIZE); 
    8786 
    8887        global_sid              = 2000000; 
     
    9493        i                       = 0; 
    9594        qsize                   = 0; 
     95 
     96        lock_mutex              = 1;    // if this is set, threads will lock mutexes 
    9697 
    9798 
     
    107108        secret                  = NULL;         // s: NULL 
    108109        verbose                 = 0;            // v: don't be verbose 
     110        initial_threshold       = 30;           // initial cluster element threshold for signature generation 
    109111 
    110112        memset(&md5sum_trie, 0, sizeof(trie_node)); 
     
    112114        memset(&sighash_trie, 0, sizeof(trie_node)); 
    113115 
     116        // initialize thread locks 
    114117        if (pthread_rwlock_init(&md5sum_trie_lock, NULL) || 
    115118            pthread_rwlock_init(&spamsum_trie_lock, NULL) || 
    116119            pthread_rwlock_init(&sighash_trie_lock, NULL) || 
    117             pthread_rwlock_init(&sidlock, NULL)) { 
    118                 fprintf(stderr, "Error - Unable to initialize queue mutex: %m.\n"); 
     120            pthread_rwlock_init(&sidlock, NULL) || 
     121            pthread_mutex_init(&sessions_mutex, NULL) || 
     122            pthread_mutex_init(&siggen_mutex, NULL) || 
     123            pthread_mutex_init(&sigwrite_mutex, NULL)) { 
     124                fprintf(stderr, "Error - Unable to initialize thread lock: %m.\n"); 
    119125                exit(EXIT_FAILURE); 
    120126        } 
     
    123129 
    124130        // process args 
    125         while((option = getopt(argc, argv, "a:c:C:dE:hi:O:p:r:s:v?")) > 0) { 
     131        while((option = getopt(argc, argv, "a:c:C:dE:hi:O:p:r:s:t:v?")) > 0) { 
    126132                switch(option) { 
    127133                        case 'a': 
     
    183189                                secret = optarg; 
    184190                                break; 
     191                        case 't': 
     192                                initial_threshold = strtoul(optarg, NULL, 10); 
     193                                if (initial_threshold < 2) { 
     194                                        fprintf(stderr, "Error - Invalid initial threshold.\n"); 
     195                                        exit(EXIT_FAILURE); 
     196                                } 
     197                                break; 
    185198                        case 'v': 
    186199                                verbose++; 
     
    194207        } 
    195208 
     209        // set up signal handling 
    196210        set_signal_handlers(); 
     211 
     212        // initialize HMAC pads 
     213        memset(k_ipad, IPAD_VAL, HMAC_BLOCK_SIZE); 
     214        memset(k_opad, OPAD_VAL, HMAC_BLOCK_SIZE); 
     215        if (secret) for (i=0; i<strlen(secret); i++) { 
     216                k_ipad[i] ^= secret[i]; 
     217                k_opad[i] ^= secret[i]; 
     218        } 
    197219 
    198220        // initialize queues 
     
    208230        } else if (verbose) printf("  Submission secret: %s\n", secret); 
    209231 
    210         if (snort_pid && !rules_file) { 
    211                 printf("  Warning - Snort will not be notified about updated rules: No rule filename given.\n"); 
    212         } else if (!snort_pid && rules_file) { 
    213                 printf("  Warning - Snort will not be notified about updated rules: No pid given.\n"); 
    214         } else if (verbose) { 
    215                printf("  Notifying snort about rule file updates in %s.\n", rules_file); 
    216         } 
    217  
    218         if (verbose) printf("  Initial snort signature ID: %u\n\n", global_sid); 
     232        if (rules_file && verbose) 
     233                printf("  Appending new rules to %s.\n", rules_file); 
     234        if (snort_pid && verbose) 
     235                printf("  Forcing snort (process ID %u) reload on new rules.\n", snort_pid); 
     236 
     237        if (verbose) { 
     238               printf("  Initial snort signature ID: %u\n\n", global_sid); 
     239                printf("  Initial cluster size threshold: %lu\n\n", initial_threshold); 
     240        } 
    219241 
    220242 
     
    222244        printf("[*] Ready.\n"); 
    223245 
    224         pfdset[0].fd          = listen_fd; 
    225         pfdset[0].events      = POLLIN; 
     246        LISTEN_SOCK.fd                = listen_fd; 
     247        LISTEN_SOCK.events    = POLLIN; 
    226248        for(;;) { 
    227                 switch (rv = poll(pfdset, POLLFD_SET_SIZE, -1)) { 
     249                switch (rv = poll(pfdset, POLLFD_SET_SIZE+1, -1)) { 
    228250                case -1: 
    229                         fprintf(stderr, "Error with select(): %s.\n", strerror(errno)); 
     251                        if (errno == EINTR) break; 
     252                        fprintf(stderr, "Error - Unable to poll sockets: %s.\n", strerror(errno)); 
    230253                        exit(1); 
    231254                case  0: 
     
    234257                        // check listen_fd 
    235258 
    236                         if (pfdset[0].revents) { 
    237                                 if (pfdset[0].revents & POLLERR) { 
     259                        if (LISTEN_SOCK.revents) { 
     260                                if (LISTEN_SOCK.revents & POLLERR) { 
    238261                                        fprintf(stderr, "Error - Unable to poll listening socket: %s.\n", strerror(errno)); 
    239262                                        exit(EXIT_FAILURE); 
    240                                 } else if (pfdset[0].revents & POLLHUP) { 
     263                                } else if (LISTEN_SOCK.revents & POLLHUP) { 
    241264                                        fprintf(stderr, "Error - Listening socket hangup.\n"); 
    242265                                        exit(EXIT_FAILURE); 
    243                                 } else if (pfdset[0].revents & POLLNVAL) { 
     266                                } else if (LISTEN_SOCK.revents & POLLNVAL) { 
    244267                                        fprintf(stderr, "Error - Listening socket descriptor is invalid.\n"); 
    245268                                        exit(EXIT_FAILURE); 
    246269                                } 
    247270 
    248                                 if (pfdset[0].revents & POLLIN) { 
     271                                if (LISTEN_SOCK.revents & POLLIN) { 
    249272                                        // incoming connection, find next free place in poll fd set 
    250273                                 
    251                                         for (i=1; i<POLLFD_SET_SIZE; i++) if (!pfdset[i].events) break; 
    252                                         pfdset[i].fd            = net_accept(pfdset[0].fd); 
    253                                         pfdset[i].events        = POLLIN; 
    254  
    255                                         if (verbose) printf("Connection accepted.\n"); 
     274                                        for (i=0; i<POLLFD_SET_SIZE; i++) { 
     275                                                if (pfdset[i].fd <= 0) { 
     276                                                        pthread_mutex_lock(&sessions_mutex); 
     277 
     278                                                        pfdset[i].fd            = net_accept(LISTEN_SOCK.fd); 
     279                                                        pfdset[i].events        = POLLOUT; 
     280 
     281                                                        memset(&s[i], 0, sizeof(submission)); 
     282 
     283                                                        pthread_mutex_unlock(&sessions_mutex); 
     284 
     285                                                        if (verbose) printf("[>] Connection accepted.\n"); 
     286                                                        break; 
     287                                                } 
     288                                        }        
    256289                                } 
    257290                        } 
    258                         for (i=1; i<POLLFD_SET_SIZE; i++) { 
    259                                 if (pfdset[i].revents & POLLIN) { 
     291                        for (i=0; i<POLLFD_SET_SIZE; i++) { 
     292                                if (pfdset[i].revents & POLLOUT) { 
    260293                                        switch (session_handle_data(&pfdset[i], &s[i])) { 
    261                                         case 1: 
    262                                                 // create submission copy so that we can reset the session  
    263                                                 memcpy(&tmp_submission, &s[i], sizeof(submission)); 
    264                                                 s[i].attack     = NULL; 
    265                                                 s[i].cattack    = NULL; 
    266                                                 s[i].md5sum     = NULL; 
    267  
    268                                                 // create clustering thread 
    269                                                 if (pthread_create(&ntid, NULL, pt_classify, (void *) &tmp_submission)) { 
    270                                                         fprintf(stderr, "Error - Cannot create clustering thread: %s.\n", strerror(errno)); 
    271                                                         exit(EXIT_FAILURE); 
    272                                                 } 
    273  
    274                                                 // reset session 
    275                                                 session_reset(&s[i], &pfdset[i]); 
    276                                                 break; 
    277294                                        case -1: 
    278295                                                fprintf(stderr, "Error - Invalid submission state. Terminating session.\n"); 
     296                                                session_reset(&s[i], &pfdset[i]); 
     297                                                break; 
     298                                        case 0: 
     299                                                session_reset(&s[i], &pfdset[i]); 
    279300                                                break; 
    280301                                        default: 
     302                                                pfdset[i].events = POLLIN; 
    281303                                                break; 
     304                                        } 
     305                                } 
     306                                if (pfdset[i].revents) { 
     307                                        if (pfdset[i].revents & POLLIN) { 
     308                                                switch (session_handle_data(&pfdset[i], &s[i])) { 
     309                                                case 2: 
     310                                                        // need more data to complete session 
     311                                                        break; 
     312                                                case 1: 
     313                                                        // create clustering thread 
     314                                                        if ((tmp_submission = calloc(1, sizeof(submission))) == NULL) { 
     315                                                                fprintf(stderr, "Error - Unable to allocate memory: %s.\n", strerror(errno)); 
     316                                                                exit(EXIT_FAILURE); 
     317                                                        } 
     318                                                        memcpy(tmp_submission, &s[i], sizeof(submission)); 
     319                                                        memset(&s[i], 0, sizeof(submission)); 
     320 
     321                                                        if (pthread_attr_init(&ptattr) || 
     322                                                            pthread_attr_setdetachstate(&ptattr, PTHREAD_CREATE_DETACHED)) { 
     323                                                                fprintf(stderr, "Error - Unable to initialize thread attributes: %s.\n", strerror(errno)); 
     324                                                                exit(EXIT_FAILURE); 
     325                                                        } 
     326                                                        pthread_mutex_lock(&sessions_mutex); 
     327                                                        if (pthread_create(&ntid, &ptattr, pt_classify, (void *) tmp_submission)) { 
     328                                                                fprintf(stderr, "Error - Cannot create clustering thread: %s.\n", strerror(errno)); 
     329                                                                exit(EXIT_FAILURE); 
     330                                                        } 
     331                                                        pthread_attr_destroy(&ptattr); 
     332 
     333                                                        session_reset(&s[i], &pfdset[i]); 
     334                                                        break; 
     335                                                case 0: 
     336                                                        // connection closed 
     337                                                        session_reset(&s[i], &pfdset[i]); 
     338                                                        break; 
     339                                                case -1: 
     340                                                        // program error 
     341                                                        fprintf(stderr, "Error - Terminating session.\n"); 
     342                                                        session_reset(&s[i], &pfdset[i]); 
     343                                                        break; 
     344                                                case -2: 
     345                                                        // connection state error 
     346                                                        fprintf(stderr, "Error - Invalid connection state.\n"); 
     347                                                        session_reset(&s[i], &pfdset[i]); 
     348                                                        break; 
     349                                                default: 
     350                                                        break; 
     351                                                } 
     352                                        } else if (pfdset[i].revents & POLLERR && errno != EINTR) { 
     353                                                fprintf(stderr, "Error - Unable to poll socket: %s.\n", strerror(errno)); 
     354                                                session_reset(&s[i], &pfdset[i]); 
     355                                        } else if (pfdset[i].revents & POLLHUP) { 
     356                                                fprintf(stderr, "Error - Socket hangup.\n"); 
     357                                                session_reset(&s[i], &pfdset[i]); 
     358                                        } else if (pfdset[i].revents & POLLNVAL) { 
     359                                                fprintf(stderr, "Error - Socket descriptor is invalid.\n"); 
     360                                                session_reset(&s[i], &pfdset[i]); 
    282361                                        } 
    283362                                } 
  • nebula/trunk/src/nebula.h

    r1560 r1566  
    3232#include "cluster.h" 
    3333#include "hash.h" 
     34#include "session.h" 
    3435#include "stree.h" 
     36 
     37 
     38#define POLLFD_SET_SIZE 1023 
     39#define LISTEN_SOCK     pfdset[POLLFD_SET_SIZE] 
     40 
     41struct pollfd   pfdset[POLLFD_SET_SIZE+1]; 
     42 
     43int             lock_mutex; 
    3544 
    3645u_char          verbose; 
     
    4150double          cluster_radius; 
    4251 
     52unsigned long int       initial_threshold; 
     53 
    4354queue           *clusterq; 
    4455queue           *outlierq; 
    4556queue           *aconnq; 
    4657 
    47 pthread_rwlock_t md5sum_trie_lock; 
    48 pthread_rwlock_t spamsum_trie_lock; 
    49 pthread_rwlock_t sighash_trie_lock; 
     58pthread_mutex_t         sessions_mutex; 
     59pthread_rwlock_t        md5sum_trie_lock; 
     60pthread_rwlock_t        spamsum_trie_lock; 
     61pthread_rwlock_t        sighash_trie_lock; 
    5062 
    5163#endif 
  • nebula/trunk/src/queue.c

    r1558 r1566  
    3232        if (!q || !data) return(NULL); 
    3333 
    34         pthread_rwlock_wrlock(&q->lock); 
    35  
    3634        if ((new = calloc(1, sizeof(qelem))) == NULL) return(NULL); 
    3735        new->data       = data; 
     
    4644        q->size++; 
    4745 
    48         pthread_rwlock_unlock(&q->lock); 
    4946        return(new); 
    5047} 
     
    5552         
    5653        if (!q || !data) return(NULL); 
    57  
    58         pthread_rwlock_wrlock(&q->lock); 
    5954 
    6055        if ((new = calloc(1, sizeof(qelem))) == NULL) return(NULL); 
     
    7065        q->size++; 
    7166 
    72         pthread_rwlock_unlock(&q->lock); 
    7367        return(new); 
    7468} 
     
    8074        if (!q || q->head == NULL) return(NULL); 
    8175 
    82         pthread_rwlock_wrlock(&q->lock); 
    83  
    8476        tmp = q->head; 
    8577        q->head = q->head->next; 
     
    8880        q->size--; 
    8981 
    90         pthread_rwlock_unlock(&q->lock); 
    9182        return(tmp); 
    9283} 
     
    9889        if (!q || q->tail == NULL) return(NULL); 
    9990 
    100         pthread_rwlock_wrlock(&q->lock); 
    101  
    10291        tmp = q->tail; 
    10392        q->tail = q->tail->prev; 
     
    10695        q->size--; 
    10796 
    108         pthread_rwlock_unlock(&q->lock); 
    10997        return(tmp); 
    11098} 
     
    136124                e = queue_cuttail(q); 
    137125        } else { 
    138                 pthread_rwlock_wrlock(&q->lock); 
    139  
    140126                e->prev->next = e->next; 
    141127                e->next->prev = e->prev; 
    142128                if (!q->size--) q->head = q->tail = NULL; 
    143  
    144                 pthread_rwlock_unlock(&q->lock); 
    145129        } 
    146130 
     
    159143        } 
    160144        if (pthread_rwlock_init(&q->lock, NULL) != 0) { 
    161                 fprintf(stderr, "Error - Unable to initialize queue mutex: %m.\n"); 
     145                fprintf(stderr, "Error - Unable to initialize thread lock: %m.\n"); 
    162146                exit(EXIT_FAILURE); 
    163147        } 
     
    172156        if (!q) return; 
    173157 
    174         pthread_rwlock_wrlock(&q->lock); 
     158        if (lock_mutex) pthread_rwlock_wrlock(&q->lock); 
    175159 
    176160        while (q->head) { 
     
    181165        } 
    182166 
    183         pthread_rwlock_unlock(&q->lock); 
     167        if (lock_mutex) pthread_rwlock_unlock(&q->lock); 
    184168        pthread_rwlock_destroy(&q->lock); 
    185169 
  • nebula/trunk/src/queue.h

    r1558 r1566  
    2828#include <pthread.h> 
    2929 
     30#define Q_WLOCK(x)      pthread_rwlock_wrlock(&((queue *)x)->lock); 
     31#define Q_RLOCK(x)      pthread_rwlock_rdlock(&((queue *)x)->lock); 
     32#define Q_ULOCK(x)      pthread_rwlock_unlock(&((queue *)x)->lock); 
     33 
    3034 
    3135typedef struct qelem { 
  • nebula/trunk/src/session.c

    r1563 r1566  
    3434 
    3535 
     36 
    3637void session_reset(submission *s, struct pollfd *pfd) { 
    3738        if (!s) return; 
     39 
     40        pthread_mutex_lock(&sessions_mutex); 
    3841 
    3942        free(s->cattack); 
    4043        free(s->attack); 
    4144        free(s->md5sum); 
     45        free(s->hmac); 
    4246        memset(s, 0, sizeof(submission)); 
    4347 
    4448        if (pfd) { 
    45                 close(pfd->fd); 
     49                if (pfd->fd >= 0) close(pfd->fd); 
     50                if (verbose) printf("[<] Connection terminated.\n"); 
     51 
    4652                memset(pfd, 0, sizeof(struct pollfd));