Changeset 1558

Show
Ignore:
Timestamp:
02/19/08 00:27:59 (6 months ago)
Author:
till
Message:

nebula
- first daemon version

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • nebula/trunk/Makefile.am

    r1524 r1558  
    11AUTOMAKE_OPTIONS = foreign 
    22 
    3 SUBDIRS = src gst 
     3SUBDIRS = src 
    44 
    55EXTRA_DIST = ChangeLog LICENSE README 
  • nebula/trunk/configure.in

    r1524 r1558  
    3737AC_PROG_INSTALL 
    3838AC_CONFIG_FILES([Makefile 
    39                  src/Makefile  
    40                  gst/Makefile])  
     39                 src/Makefile])  
    4140 
    4241AC_OUTPUT 
  • nebula/trunk/src/Makefile.am

    r1431 r1558  
    11AM_CFLAGS=-Wall -Werror 
    22 
    3 LIBS += -lm 
     3LIBS += -lm -lz -lpthread 
    44 
    55bin_PROGRAMS = nebula 
     
    1212                        hash.c hash.h \ 
    1313                        cluster.c cluster.h \ 
     14                        session.c session.h \ 
     15                        net.c net.h \ 
     16                        classify.c classify.h \ 
     17                        avl.c avl.h \ 
     18                        lca.c lca.h \ 
     19                        sig.c sig.h \ 
     20                        cstr.c cstr.h \ 
     21                        stree.c stree.h \ 
    1422                        nebula.c nebula.h 
  • nebula/trunk/src/cluster.c

    r1434 r1558  
    4848 
    4949/* delete cluster */ 
    50 void cluster_free(void *cl, u_char list_flag) { 
     50void cluster_free(void *cl) { 
    5151        if (!cl) return; 
    5252 
    53         if (list_flag & list_files) printf("-- Cluster has %u elements\n", (unsigned int) ((cluster *)cl)->hq->size); 
    54         queue_free(((cluster *)cl)->hq, list_flag & list_files, hash_free); 
     53        queue_free(((cluster *)cl)->hq, hash_free); 
    5554        free(cl); 
    5655 
     
    7170                        exit(EXIT_FAILURE); 
    7271                } 
     72                cl->cnt++; 
    7373        } else { 
    7474                if (verbose) fprintf(stderr, "Warning - Could not add hash to cluster queue: Maximum size reached.\n"); 
     
    103103 
    104104                // unlink and free old cluster 
    105                 queue_free(src->hq, 0, NULL); 
     105                queue_free(src->hq, NULL); 
    106106                entry = queue_unlink(q, src->parent); 
    107                 cluster_free(entry->data, 1); 
     107                cluster_free(entry->data); 
    108108                free(entry); 
    109109 
  • nebula/trunk/src/cluster.h

    r1432 r1558  
    4141        struct cluster *next; 
    4242        queue *hq; 
     43 
    4344} cluster; 
    4445 
    4546 
    46  
    4747inline cluster *cluster_new(void); 
    48 void cluster_free(void *cl, u_char list_flag); 
     48void cluster_free(void *cl); 
    4949 
    5050inline cluster *create_hashlist(hash *hl1, hash *hl2); 
     
    5252cluster *create_cluster(hash *l1, hash *l2); 
    5353cluster *clusters_merge(queue *q, cluster *dst, cluster *src); 
    54 void clusterlist_delete(cluster *list, u_char list_files); 
     54void clusterlist_delete(cluster *list); 
    5555 
    5656#endif 
  • nebula/trunk/src/hash.c

    r1432 r1558  
    2626#include "hash.h" 
    2727#include "nebula.h" 
     28#include "session.h" 
    2829 
    2930hash *hash_new(void) { 
     
    3738 
    3839 
    39 void hash_free(void *h, u_char list_flag) { 
     40void hash_free(void *h) { 
    4041        if (!h) return; 
    4142 
    42         if (list_flag && ((hash *)h)->filename) printf("    %s\n", ((hash *)h)->filename); 
    4343        free(((hash *)h)->md5sum); 
     44        ((hash *)h)->md5sum = NULL; 
     45        ((submission *)((hash *)h)->submission)->md5sum = NULL; 
     46 
    4447        free(((hash *)h)->spamsum); 
    45         free(((hash *)h)->filename); 
     48        ((hash *)h)->spamsum = NULL; 
     49 
     50        free(((hash *)h)->sha512sum); 
     51        ((hash *)h)->sha512sum = NULL; 
     52 
     53        session_reset(((hash *)h)->submission, NULL); 
     54        free(((hash *)h)->submission); 
     55        ((hash *)h)->submission = NULL; 
    4656 
    4757        if (((hash *)h)->free) ((hash *)h)->free(h); 
  • nebula/trunk/src/hash.h

    r1432 r1558  
    3030 
    3131typedef struct hash { 
    32         u_int16_t cnt; 
    33         u_int16_t hashlen; 
    34         u_int16_t filecnt; 
    35         char *md5sum; 
    36         char *spamsum; 
    37         char *filename; 
    38         struct cluster *cl; 
    39         void (*free)(struct hash *h); 
     32        u_int16_t       cnt; 
     33        u_int16_t       hashlen; 
     34        u_int16_t       filecnt; 
     35        char            *md5sum; 
     36        char            *spamsum; 
     37        char            *sha512sum; 
     38        void            *submission; 
     39        struct cluster  *cl; 
     40        void            (*free)(struct hash *h); 
    4041} hash; 
    4142 
    4243 
    4344hash *hash_new(void); 
    44 void hash_free(void *cl, u_char list_flag); 
     45void hash_free(void *cl); 
    4546 
    4647#endif 
  • nebula/trunk/src/nebula.c

    r1435 r1558  
    2323#include <fcntl.h> 
    2424#include <math.h> 
     25#include <poll.h> 
     26#include <pthread.h> 
    2527#include <stdlib.h> 
    2628#include <stdio.h> 
     
    3234#include <unistd.h> 
    3335 
     36#include "classify.h" 
    3437#include "cluster.h" 
    3538#include "md5.h" 
    3639#include "nebula.h" 
     40#include "net.h" 
    3741#include "ngram.h" 
     42#include "session.h" 
     43#include "sig.h" 
    3844#include "signals.h" 
    3945#include "spamsum.h" 
     
    4551 
    4652 
     53#define POLLFD_SET_SIZE 10 
     54 
     55 
    4756void usage(const char* progname, const int exit_val) { 
    4857        printf("Usage: %s " 
    4958                " -C <size>\t cluster queue size\n" 
     59                "\t\t -c <similarity> cluster criteria (a similarity measure in percent)\n" 
     60                "\t\t -d\t\t daemonize\n" 
    5061                "\t\t -E <size>\t cluster element queue size\n" 
     62                "\t\t -h\t\t show this help\n" 
    5163                "\t\t -O <size>\t outlier queue size\n" 
    52 #ifdef PROFILE 
    53                 "\t\t -a <seconds>\t alarm interval for profiling output\n" 
    54 #endif 
    55                 "\t\t -c <similarity> cluster criteria (a similarity measure in percent)\n" 
    56                 "\t\t -d <directory>\t process direcory content\n" 
    57                 "\t\t -h\t\t show this help\n" 
    58                 "\t\t -l\t\t list clustered files in result\n" 
    59                 "\t\t -o\t\t list outlier in result\n" 
    60                 "\t\t -p\t\t show progress\n" 
    61                 "\t\t -r\t\t hide result (only calculate cluster)\n" 
    62                 "\t\t -t\t\t sort input files by creation time\n" 
     64                "\t\t -p <port>\t listen on this port\n" 
     65                "\t\t -s <secret>\t secret string for use in submissions\n" 
    6366                "\t\t -v\t\t be verbose\n" 
    6467                "\t\t\t\t [files ...]\n", 
     
    6972 
    7073int main(int argc, char *argv[]) { 
    71 //int j; 
    72         int             i, qsize; 
    73         u_char          time_sort, *content, *tmpbuf; 
    74         char            option, *dirname, *filename; 
    75         hash            *tmp_hash; 
    76         qelem           *cur_cqelem, *tmp_cqelem, *cur_hqelem, *tmp_hqelem; 
    77         double          score; 
    78         trie_node       *t; 
    79         bstring         bstr; 
    80         int             n; 
    81  
    82  
    83         dirname                 = NULL; 
    84         namelist                = NULL; 
     74        int             i, qsize, port, listen_fd, rv; 
     75        struct pollfd   pfdset[POLLFD_SET_SIZE]; 
     76        u_char          daemonize; 
     77        char            option; 
     78        submission      s[POLLFD_SET_SIZE-1], tmp_submission; 
     79        pthread_t       ntid; 
     80 
     81 
     82        memset(pfdset, 0, sizeof(struct pollfd) * POLLFD_SET_SIZE); 
     83        memset(s, 0, sizeof(submission) * (POLLFD_SET_SIZE-1)); 
     84 
     85        global_sid              = 2000000; 
    8586 
    8687        outlierq                = NULL; 
    8788        clusterq                = NULL; 
    88  
    89         content                 = NULL; 
    90         num_of_files            = 0; 
    91         num_of_duplicates       = 0; 
     89        aconnq                  = NULL; 
     90 
    9291        i                       = 0; 
    9392        qsize                   = 0; 
    94         total_files             = 0; 
    95 #ifdef PROFILE 
    96         alarm_time              = 5; 
    97 #endif 
    9893 
    9994        /* default values for parameters */ 
    100         time_sort               = 0;    // don't process files chronologically 
    101         verbose                 = 0;    // don't be verbose 
    102         hide_result             = 0;    // show resulting clusters 
    103         show_progress           = 0;    // don't show progress dots 
    104         list_files              = 0;    // don't list cluster objects 
    105         list_outlier            = 0;    // don't list outlier  
    106         cluster_radius          = 95.0; // 95% similarity as cluster criteria 
    107         outlierq_max            = 500000; 
    108         clusterhashq_max        = 500000; 
    109         clusterq_max            = 5000; 
     95        secret                  = NULL;         // s: NULL 
     96        daemonize               = 0;            // d: 0 
     97        port                    = 12346;        // p: 12345 
     98        verbose                 = 0;            // v: don't be verbose 
     99        cluster_radius          = 95.0;         // c: 95% similarity as cluster criteria 
     100        outlierq_max            = 500000;       // O 
     101        clusterhashq_max        = 500000;       // E 
     102        clusterq_max            = 5000;         // C 
    110103 
    111104        memset(&md5sum_trie, 0, sizeof(trie_node)); 
    112105        memset(&spamsum_trie, 0, sizeof(trie_node)); 
    113106 
    114         printf("\n  Nebula %s Copyright (C) 2007 Tillmann Werner <tillmann.werner@gmx.de>\n\n", VERSION); 
     107        pthread_rwlock_init(&md5sum_trie_lock, NULL); 
     108        pthread_rwlock_init(&spamsum_trie_lock, NULL); 
     109        pthread_rwlock_init(&sidlock, NULL); 
     110 
     111        printf("\n  Nebula %s Copyright (C) 2007-2008 Tillmann Werner <tillmann.werner@gmx.de>\n\n", VERSION); 
    115112 
    116113        // process args 
    117 #ifdef PROFILE 
    118         while((option = getopt(argc, argv, "a:c:C:E:O:rtplovd:h?")) > 0) { 
    119 #else 
    120         while((option = getopt(argc, argv, "c:C:E:O:rtplovd:h?")) > 0) { 
    121 #endif 
     114        while((option = getopt(argc, argv, "c:C:dE:hO:p:s:v?")) > 0) { 
    122115                switch(option) { 
    123 #ifdef PROFILE 
    124                         case 'a': 
    125                                 alarm_time = atoi(optarg); 
    126                                 if (alarm_time < 0) { 
    127                                         fprintf(stderr, "Error - Profile time interval must be a non-negative value (0 means 'off').\n"); 
    128                                         exit(EXIT_FAILURE); 
    129                                 } 
    130                                 break; 
    131 #endif 
    132116                        case 'c': 
    133117                                cluster_radius = atof(optarg); 
     
    145129                                break; 
    146130                        case 'd': 
    147                                 if (chdir(dirname = optarg) != 0) { 
    148                                         fprintf(stderr, "Error - Unable to change into directory %s: %m.\n", dirname); 
    149                                         exit(EXIT_FAILURE); 
    150                                 } 
     131                                daemonize = 1; 
    151132                                break; 
    152133                        case 'E': 
     
    157138                                } 
    158139                                break; 
    159                         case 'l': 
    160                                 list_files = 1; 
    161                                 break; 
    162                         case 'o': 
    163                                 list_outlier = 1; 
    164                                 break; 
    165140                        case 'O': 
    166141                                outlierq_max = atoi(optarg); 
     
    171146                                break; 
    172147                        case 'p': 
    173                                 show_progress = 1; 
    174                                 break; 
    175                         case 'r': 
    176                                 hide_result = 1; 
    177                                 break; 
    178                         case 't': 
    179                                 time_sort = 1; 
     148                                port = atoi(optarg); 
     149                                if (!port || port > 65535) { 
     150                                        fprintf(stderr, "Error - Invalid port.\n"); 
     151                                        exit(EXIT_FAILURE); 
     152                                } 
     153                                break; 
     154                        case 's': 
     155                                secret = optarg; 
    180156                                break; 
    181157                        case 'v': 
    182                                 verbose = 1
     158                                verbose++
    183159                                break; 
    184160                        case 'h': 
     
    190166        } 
    191167 
    192         if (hide_result && list_files) list_files = 0; 
    193  
    194168        set_signal_handlers(); 
    195169 
    196  
    197         // if a directory is given, scan (and sort) its content 
    198         if (dirname) { 
    199                 printf("Scanning directory... "); 
    200                 fflush(stdout); 
    201                 if (time_sort) { 
    202                         if ((n = scandir(dirname, &namelist, regular_file, timesort)) == -1) { 
    203                                 fprintf(stderr, "Error - Unable to read directory entries: %m.\n"); 
    204                                 exit(EXIT_FAILURE); 
     170        // initialize queues 
     171        outlierq = queue_new(); 
     172        clusterq = queue_new(); 
     173 
     174 
     175        // bind to port 
     176        listen_fd = net_listen(port); 
     177 
     178        if (!secret) { 
     179                printf("Warning - No submission secret given.\n"); 
     180        } else if (verbose) printf("Submission secret: %s\n", secret); 
     181 
     182        // process incoming connections 
     183        printf("[*] Ready.\n"); 
     184 
     185        pfdset[0].fd            = listen_fd; 
     186        pfdset[0].events        = POLLIN; 
     187        for(;;) { 
     188                switch (rv = poll(pfdset, POLLFD_SET_SIZE, -1)) { 
     189                case -1: 
     190                        fprintf(stderr, "Error with select(): %s.\n", strerror(errno)); 
     191                        exit(1); 
     192                case  0: 
     193                        break; 
     194                default: 
     195                        // check listen_fd 
     196 
     197                        if (pfdset[0].revents) { 
     198                                if (pfdset[0].revents & POLLERR) { 
     199                                        fprintf(stderr, "Error - Unable to poll listening socket: %s.\n", strerror(errno)); 
     200                                        exit(EXIT_FAILURE); 
     201                                } else if (pfdset[0].revents & POLLHUP) { 
     202                                        fprintf(stderr, "Error - Listening socket hangup.\n"); 
     203                                        exit(EXIT_FAILURE); 
     204                                } else if (pfdset[0].revents & POLLNVAL) { 
     205                                        fprintf(stderr, "Error - Listening socket descriptor is invalid.\n"); 
     206                                        exit(EXIT_FAILURE); 
     207                                } 
     208 
     209                                if (pfdset[0].revents & POLLIN) { 
     210                                        // incoming connection, find next free place in poll fd set 
     211                                 
     212                                        for (i=1; i<POLLFD_SET_SIZE; i++) if (!pfdset[i].events) break; 
     213                                        pfdset[i].fd            = net_accept(pfdset[0].fd); 
     214                                        pfdset[i].events        = POLLIN; 
     215 
     216                                        if (verbose) printf("Connection accepted.\n"); 
     217                                } 
    205218                        } 
    206                 } else { 
    207                         if ((n = scandir(dirname, &namelist, regular_file, alphasort)) == -1) { 
    208                                 fprintf(stderr, "Error - Unable to read directory entries: %m.\n"); 
    209                                 exit(EXIT_FAILURE); 
     219                        for (i=1; i<POLLFD_SET_SIZE; i++) { 
     220                                if (pfdset[i].revents & POLLIN) { 
     221                                        switch (session_handle_data(&pfdset[i], &s[i])) { 
     222                                        case 1: 
     223                                                // create submission copy so that we can reset the session  
     224                                                memcpy(&tmp_submission, &s[i], sizeof(submission)); 
     225                                                s[i].secret     = NULL; 
     226                                                s[i].attack     = NULL; 
     227                                                s[i].cattack    = NULL; 
     228                                                s[i].md5sum     = NULL; 
     229 
     230                                                // create clustering thread 
     231                                                if (pthread_create(&ntid, NULL, pt_classify, (void *) &tmp_submission)) { 
     232                                                        fprintf(stderr, "Error - Cannot create clustering thread: %s.\n", strerror(errno)); 
     233                                                        exit(EXIT_FAILURE); 
     234                                                } 
     235 
     236                                                // reset session 
     237                                                session_reset(&s[i], &pfdset[i]); 
     238                                                break; 
     239                                        case -1: 
     240                                                fprintf(stderr, "Error - Invalid submission state. Terminating session.\n"); 
     241                                                break; 
     242                                        default: 
     243                                                break; 
     244                                        } 
     245                                } 
    210246                        } 
    211247                } 
    212                 total_files = n; 
    213         } else { 
    214                 if ((argc - optind) < 1) usage(argv[0], EXIT_FAILURE); 
    215                 i = optind; 
    216                 total_files = argc - i; 
    217                 if (time_sort) printf("Only directories (-d) can be processed chronologically (-t).\n"); 
    218248        } 
    219249 
    220         // initialize outlier queue 
    221         outlierq = queue_new(); 
    222         clusterq = queue_new(); 
    223  
    224  
    225 #ifdef PROFILE 
    226         if (alarm_time) { 
    227                 alarm(alarm_time); 
    228                 printf("Profiling enabled, printing statistics every %d seconds.\n", alarm_time); 
    229                 files_in_interval = 0; 
    230                 bytes_in_interval = 0; 
    231                 checkpoint = 0; 
    232         } 
    233 #endif 
    234  
    235         printf("processing %u files.\n", (unsigned int) total_files); 
    236         if (!verbose && show_progress) { 
    237                 printf("files processed:                       "); 
    238                 fflush(stdout); 
    239         } 
    240  
    241  
    242         for (n=0; n<total_files; n++) { 
    243                 filename = (dirname ? namelist[n]->d_name : argv[n+i]); 
    244  
    245                 bstr = bstr_map(filename); 
    246                 num_of_files++; 
    247 #ifdef PROFILE 
    248                 files_in_interval++; 
    249                 bytes_in_interval += bstr.len; 
    250 #endif 
    251  
    252                 if (verbose) printf("  processing file %s.\n", filename); 
    253                 else if (show_progress) { 
    254                         printf("\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b"); 
    255                         if (num_of_files < total_files) 
    256                                 printf("files processed: %05.2f%% (%04u clusters)", num_of_files/total_files*100, (unsigned int) clusterq->size); 
    257                         else 
    258                                 printf("files processed: 100.00%% (%04u clusters)\n", (unsigned int) clusterq->size); 
    259                         fflush(stdout); 
    260                 } 
    261  
    262  
    263                 // calculate md5sum 
    264                 if ((tmpbuf = (u_char *) mem_md5sum(bstr.data, bstr.len)) == NULL) { 
    265                         fprintf(stderr, "Error - Unable to allocate memory: %m.\n"); 
    266                         exit(EXIT_FAILURE); 
    267                 } 
    268  
    269                 if ((t = trie_find_memstr(&md5sum_trie, tmpbuf, strlen((char *)tmpbuf))) != NULL) { 
    270                     
    271                         // md5sum is already in trie 
    272                         free(tmpbuf); 
    273                         ((hash*)t->data)->cnt++; 
    274                         if (verbose) printf("    md5sum is %s (%u instances)\n", ((hash*)t->data)->md5sum, ((hash*)t->data)->cnt); 
    275                         if (verbose) printf("    absolute match found.\n"); 
    276                         num_of_duplicates++; 
    277                 } else { 
    278                         // md5sum not in trie, create new element 
    279                         t = trie_memins(&md5sum_trie, tmpbuf, strlen((char *)tmpbuf), NULL); 
    280                         if ((t->data = calloc(1, sizeof(hash))) == NULL) { 
    281                                 fprintf(stderr, "Error - Unable to allocate memory: %m.\n"); 
    282                                 exit(EXIT_FAILURE); 
    283                         } 
    284                         ((hash*)t->data)->hashlen = 32; 
    285                         ((hash*)t->data)->md5sum = (char *) tmpbuf; 
    286                         ((hash*)t->data)->cnt++; 
    287                         // set filename 
    288                         if ((((hash*)t->data)->filename = strdup(filename)) == NULL) { 
    289                                 fprintf(stderr, "Error - Unable to allocate memory: %m.\n"); 
    290                                 exit(EXIT_FAILURE); 
    291                         } 
    292                         // set spamsum hash 
    293                         if ((((hash*)t->data)->spamsum = spamsum(bstr.data, bstr.len, 0)) == NULL) { 
    294                                 fprintf(stderr, "Error - Unable to allocate memoory: %m.\n"); 
    295                                 exit(EXIT_FAILURE); 
    296                         } 
    297                         if (verbose) printf("    md5sum is %s (%u instances)\n", ((hash*)t->data)->md5sum, ((hash*)t->data)->cnt); 
    298  
    299                         // connect all clusters within range 
    300                         for (cur_cqelem = clusterq->head; cur_cqelem; cur_cqelem = cur_cqelem->next) { 
    301                                 for (cur_hqelem = ((cluster *)cur_cqelem->data)->hq->head; cur_hqelem; cur_hqelem = cur_hqelem->next) { 
    302                                         if ((score = spamsum_match(((hash*)t->data)->spamsum, ((hash*)cur_hqelem->data)->spamsum)) >= cluster_radius) { 
    303                                                 if (!((hash*)t->data)->cl) { 
    304                                                         add_entry_to_cluster((cluster *)cur_cqelem->data, (hash*)t->data); 
    305                                                         break; 
    306                                                 } else { 
    307                                                         if ((cluster *) cur_cqelem->data != ((hash*)t->data)->cl) 
    308                                                                 clusters_merge(clusterq, (cluster *) cur_cqelem->data, ((hash*)t->data)->cl); 
    309                                                         break; 
    310                                                 } 
    311                                         } 
    312                                 } 
    313                         } 
    314  
    315                         // connect all outliers within range 
    316                         for (cur_hqelem = outlierq->head; cur_hqelem; cur_hqelem = cur_hqelem->next) { 
    317                                 if ((score = spamsum_match(((hash*)t->data)->spamsum, ((hash*)cur_hqelem->data)->spamsum)) >= cluster_radius) { 
    318                                         // unlink match from outlier list 
    319                                         tmp_hqelem = cur_hqelem; 
    320                                         cur_hqelem = cur_hqelem->next; 
    321                                         if ((tmp_hash = queue_unlink(outlierq, tmp_hqelem)) == NULL) { 
    322                                                 fprintf(stderr, "Error - Unable to unlink outlier from queue.\n"); 
    323                                                 exit(EXIT_FAILURE); 
    324                                         } 
    325                                         if (((hash*)t->data)->cl) { 
    326                                                 // add other outliers to cluster 
    327                                                 if (add_entry_to_cluster(((hash*)t->data)->cl, tmp_hash) == NULL) { 
    328                                                         trie_del_memstr(&md5sum_trie, (u_char *) tmp_hash->md5sum, strlen(tmp_hash->md5sum)); 
    329                                                         trie_del_memstr(&spamsum_trie, (u_char *) tmp_hash->spamsum, strlen(tmp_hash->spamsum)); 
    330                                                         hash_free(tmp_hash, 0); 
    331                                                 } 
    332                                         } else { 
    333                                                 // create new cluster of two outliers 
    334                                                 if ((tmp_cqelem = queue_ins(clusterq, create_cluster(((hash*)t->data), tmp_hash), clusterq_max)) != NULL) { 
    335                                                         /* cluster queue is full, last element was dropped and must be free()d */ 
    336                                                          
    337                                                         /* first remove hashes from tries */ 
    338                                                         for (tmp_hqelem = ((cluster *)tmp_cqelem->data)->hq->head; tmp_hqelem; tmp_hqelem = tmp_hqelem->next) { 
    339                                                                 trie_del_memstr(&md5sum_trie, (u_char *) ((hash *)tmp_hqelem->data)->md5sum, 
    340                                                                         strlen(((hash *)tmp_hqelem->data)->md5sum)); 
    341                                                                 trie_del_memstr(&spamsum_trie, (u_char *) ((hash *)tmp_hqelem->data)->spamsum, 
    342                                                                         strlen(((hash *)tmp_hqelem->data)->spamsum)); 
    343                                                         } 
    344                                                         /* now free cluster */ 
    345                                                         cluster_free((cluster *)tmp_cqelem->data, 0); 
    346                                                         free(tmp_cqelem); 
    347                                                 } 
    348                                                 /* set cluster element's parent pointer to cluster queue head 
    349                                                  * we need this to be able to unlink a cluster from the queue */ 
    350                                                 ((cluster *) clusterq->head->data)->parent = clusterq->head; 
    351                                         } 
    352                                 } 
    353                                 if (!cur_hqelem) break; 
    354                         } 
    355  
    356                                  
    357                         if (verbose) printf("    spamsum is %s (%u instances)\n", ((hash*)t->data)->spamsum, ((hash*)t->data)->cnt); 
    358  
    359                         if (!((hash*)t->data)->cl) { 
    360                                 // insert outlier into queue 
    361                                 if (outlierq->size >= outlierq_max) { 
    362                                         tmp_hash = queue_unlink(outlierq, outlierq->tail); 
    363                                         trie_del_memstr(&md5sum_trie, (u_char *) tmp_hash->md5sum, strlen(tmp_hash->md5sum)); 
    364                                         trie_del_memstr(&spamsum_trie, (u_char *) tmp_hash->spamsum, strlen(tmp_hash->spamsum)); 
    365                                         hash_free(tmp_hash, 0); 
    366                                 } 
    367                                 queue_ins(outlierq, t->data, outlierq_max); 
    368                         } 
    369                 } 
    370                 bstr_unmap(bstr); 
    371                 if (verbose) printf("\n"); 
    372         } 
    373250 
    374251        cleanup(); 
  • nebula/trunk/src/nebula.h

    r1435 r1558  
    2727 
    2828#include <sys/types.h> 
     29#include <pthread.h> 
    2930 
     31#include "avl.h" 
    3032#include "cluster.h" 
    3133#include "hash.h" 
     34#include "stree.h" 
    3235 
    33 u_char          verbose, list_files, list_outlier, show_progress, hide_result; 
    34 struct dirent   **namelist; 
     36u_char          verbose; 
     37char            *secret; 
     38trie_node       spamsum_trie, md5sum_trie; 
     39 
    3540ssize_t         clusterq_max, clusterhashq_max, outlierq_max;  
    36 u_int32_t       num_of_duplicates; 
    37 float           num_of_files, total_files; 
    3841double          cluster_radius; 
    39 trie_node       spamsum_trie, md5sum_trie; 
     42 
    4043queue           *clusterq; 
    4144queue           *outlierq; 
     45queue           *aconnq; 
    4246 
    43 #ifdef PROFILE 
    44 int             alarm_time;     // number of seconds for profile output interval 
    45 float           files_in_interval; 
    46 float           bytes_in_interval; 
    47 u_int32_t       checkpoint; 
    48 #endif 
     47pthread_rwlock_t md5sum_trie_lock; 
     48pthread_rwlock_t spamsum_trie_lock; 
    4949 
    5050#endif 
  • nebula/trunk/src/queue.c

    r1433 r1558  
    3030        qelem *new; 
    3131         
    32         if (!data) return(NULL); 
     32        if (!q || !data) return(NULL); 
     33 
     34        pthread_rwlock_wrlock(&q->lock); 
    3335 
    3436        if ((new = calloc(1, sizeof(qelem))) == NULL) return(NULL); 
     
    4446        q->size++; 
    4547 
     48        pthread_rwlock_unlock(&q->lock); 
    4649        return(new); 
    4750} 
     
    5154        qelem *new; 
    5255         
    53         if (!data) return(NULL); 
     56        if (!q || !data) return(NULL); 
     57 
     58        pthread_rwlock_wrlock(&q->lock); 
    5459 
    5560        if ((new = calloc(1, sizeof(qelem))) == NULL) return(NULL); 
     
    6570        q->size++; 
    6671 
     72        pthread_rwlock_unlock(&q->lock); 
    6773        return(new); 
    6874} 
     
    7278        qelem *tmp; 
    7379 
    74         if (q->head == NULL) return(NULL); 
     80        if (!q || q->head == NULL) return(NULL); 
     81 
     82        pthread_rwlock_wrlock(&q->lock); 
    7583 
    7684        tmp = q->head; 
     
    8088        q->size--; 
    8189 
     90        pthread_rwlock_unlock(&q->lock); 
    8291        return(tmp); 
    8392} 
     
    8796        qelem *tmp; 
    8897 
    89         if (q->tail == NULL) return(NULL); 
     98        if (!q || q->tail == NULL) return(NULL); 
     99 
     100        pthread_rwlock_wrlock(&q->lock); 
    90101 
    91102        tmp = q->tail; 
     
    95106        q->size--; 
    96107 
     108        pthread_rwlock_unlock(&q->lock); 
    97109        return(tmp); 
    98110} 
     
    102114        qelem* tmp = NULL; 
    103115 
    104         if (!data) return(NULL); 
     116        if (!q || !data) return(NULL); 
    105117 
    106118        /* need to cut off last queue element? */ 
     
    124136                e = queue_cuttail(q); 
    125137        } else { 
     138                pthread_rwlock_wrlock(&q->lock); 
     139 
    126140                e->prev->next = e->next; 
    127141                e->next->prev = e->prev; 
    128142                if (!q->size--) q->head = q->tail = NULL; 
     143 
     144                pthread_rwlock_unlock(&q->lock); 
    129145        } 
    130146 
     
    142158                exit(EXIT_FAILURE); 
    143159        } 
     160        if (pthread_rwlock_init(&q->lock, NULL) != 0) { 
     161                fprintf(stderr, "Error - Unable to initialize queue mutex: %m.\n"); 
     162                exit(EXIT_FAILURE); 
     163        } 
     164 
    144165        return(q); 
    145166} 
    146167 
    147168 
    148 void queue_free(queue *q, u_char list_flag, void(*cbfn)(void *data, u_char list_flag)) { 
     169void queue_free(queue *q, void(*cbfn)(void *data)) { 
    149170        qelem *cur; 
    150171 
    151172        if (!q) return; 
    152173