Changeset 1618
- Timestamp:
- 04/16/08 10:33:50 (1 month ago)
- Files:
-
- nebula/trunk/ChangeLog (modified) (1 diff)
- nebula/trunk/configure.in (modified) (2 diffs)
- nebula/trunk/src/classify.c (modified) (7 diffs)
- nebula/trunk/src/classify.h (modified) (1 diff)
- nebula/trunk/src/nebula.c (modified) (14 diffs)
- nebula/trunk/src/nebula.h (modified) (1 diff)
- nebula/trunk/src/queue.h (modified) (1 diff)
- nebula/trunk/src/session.c (modified) (2 diffs)
- nebula/trunk/src/sig.c (modified) (2 diffs)
- nebula/trunk/src/sig.h (modified) (1 diff)
- nebula/trunk/src/signals.c (modified) (2 diffs)
- nebula/trunk/src/util.c (modified) (1 diff)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
nebula/trunk/ChangeLog
r1611 r1618 1 0.2.2 2 - new threading concept: one clustering thread and one or more signature generation threads poll job queues 1 3 0.2.1 2 4 - polling corrected nebula/trunk/configure.in
r1581 r1618 1 1 # $Id$ 2 2 AC_PREREQ(02.50) 3 AC_INIT([nebula], [0.2. 1], [tillmann.werner@gmx.de])3 AC_INIT([nebula], [0.2.2], [tillmann.werner@gmx.de]) 4 4 AM_CONFIG_HEADER(config.h) 5 AM_INIT_AUTOMAKE(nebula,0.2. 1)5 AM_INIT_AUTOMAKE(nebula,0.2.2) 6 6 7 7 AC_PROG_CC … … 29 29 # Check for electric fence malloc debugger 30 30 AC_ARG_ENABLE(efence, 31 [ --enable-efence link with electric fence], enable_efence="X", enable_efence=" ")31 [ --enable-efence link with electric fence], enable_efence="X", enable_efence=" ") 32 32 if test "$enable_efence" = "X"; then 33 33 AC_CHECK_LIB(efence, EF_ALIGNMENT, LIBS="${LIBS} -lefence", AC_MSG_ERROR(libefence not found)) nebula/trunk/src/classify.c
r1617 r1618 23 23 #include <stdlib.h> 24 24 #include <string.h> 25 #include <sys/select.h> 26 #include <unistd.h> 25 27 26 28 #include "classify.h" 27 29 #include "cluster.h" 28 30 #include "nebula.h" 31 #include "queue.h" 29 32 #include "spamsum.h" 30 33 #include "session.h" … … 36 39 37 40 // pthread wrapper function 38 void *pt_classify(void *s) { 39 int rv; 40 41 if (!s) return((void *) 0); 42 43 // this thread needs to unlock the sessions mutex 44 // so that the session ID can be reused in the main thread 45 pthread_mutex_unlock(&sessions_mutex); 46 47 // call classification function 48 if ((rv = classify(s)) == -1) { 49 fprintf(stderr, "Error - Unable to classify submission.\n"); 50 return((void *) -1); 51 } else if (rv == 0) { 52 // known attack, do nothing 53 return((void *) 0); 54 } else { 55 // new attack 56 free(s); 57 } 41 void *pt_pollcq(void *param) { 42 int rv; 43 qelem *elem; 44 struct timeval timeout; 45 fd_set rfds; 46 u_char buf; 47 48 for (;;) { 49 FD_ZERO(&rfds); 50 FD_SET(cpipe[0], &rfds); 51 52 timeout.tv_sec = 10; 53 timeout.tv_usec = 0; 54 55 // this thread just processes a clustering job queue 56 // it reads wakeup calls from a pipe 57 switch (select(cpipe[0]+1, &rfds, NULL, NULL, &timeout)) { 58 case -1: 59 fprintf(stderr, "Error - select() failed: %s.\n", strerror(errno)); 60 exit(EXIT_FAILURE); 61 case 0: 62 break; 63 default: 64 if (FD_ISSET(cpipe[0], &rfds)) { 65 if (read(cpipe[0], &buf, 1) != 1) { 66 fprintf(stderr, "Error - Unable to read from pipe: %s.\n", strerror(errno)); 67 exit(EXIT_FAILURE); 68 } 69 } 70 pthread_mutex_lock(&classifyq_mutex); 71 elem = queue_cuthead(classifyq); 72 pthread_mutex_unlock(&classifyq_mutex); 73 74 if (elem) { 75 // call classification function 76 if ((rv = classify((submission *) elem->data)) == -1) { 77 fprintf(stderr, "Error - Unable to classify submission.\n"); 78 free(elem); 79 return((void *) -1); 80 } else if (rv == 0) { 81 // known attack, do nothing 82 free(elem); 83 return((void *) 0); 84 } else { 85 // new attack 86 free(elem->data); 87 free(elem); 88 } 89 } 90 break; 91 } 92 } 93 58 94 return((void *) 1); 59 95 } … … 65 101 hash *tmp_hash; 66 102 u_char *tmpbuf; 67 qelem *cur_cqelem, *tmp_cqelem, *cur_hqelem, *tmp_hqelem, *qtail ;103 qelem *cur_cqelem, *tmp_cqelem, *cur_hqelem, *tmp_hqelem, *qtail, *jqelem; 68 104 double score, max_score; 69 pthread_t ntid;70 pthread_attr_t ptattr;71 105 72 106 qtail = NULL; … … 111 145 for (cur_cqelem = clusterq->head; cur_cqelem; cur_cqelem = cur_cqelem->next) { 112 146 for (cur_hqelem = ((cluster *)cur_cqelem->data)->hq->head; cur_hqelem; cur_hqelem = cur_hqelem->next) { 113 if ((score = spamsum_match(((hash*)t->data)->spamsum, ((hash*)cur_hqelem->data)->spamsum)) >= cluster_radius) {147 if ((score = spamsum_match(((hash*)t->data)->spamsum, ((hash*)cur_hqelem->data)->spamsum)) >= sim_threshold) { 114 148 if (!((hash*)t->data)->cl) { 115 149 Q_ULOCK(&clusterq->lock); … … 149 183 Q_RLOCK(&outlierq->lock); 150 184 for (cur_hqelem = outlierq->head; cur_hqelem; cur_hqelem = cur_hqelem->next) { 151 if ((score = spamsum_match(((hash*)t->data)->spamsum, ((hash*)cur_hqelem->data)->spamsum)) >= cluster_radius) {185 if ((score = spamsum_match(((hash*)t->data)->spamsum, ((hash*)cur_hqelem->data)->spamsum)) >= sim_threshold) { 152 186 // unlink match from outlier list 153 187 … … 240 274 } 241 275 242 // check for signature generation criterion here 276 pthread_mutex_lock(&siggenq_mutex); 277 // check for signature generation criterion 243 278 if ((((hash*)t->data)->cl) && ((hash*)t->data)->cl->hq->size >= ((hash*)t->data)->cl->threshold) { 244 279 printf("[=] Cluster size (%lu) hit threshold (%lu), generating signature.\n", … … 246 281 247 282 // increment size threshold 283 if (((long unsigned) ((hash*)t->data)->cl->hq->size) > ((hash*)t->data)->cl->threshold) 284 ((hash*)t->data)->cl->threshold = ((long unsigned) ((hash*)t->data)->cl->hq->size); 248 285 ((hash*)t->data)->cl->threshold *= 1.2; 249 286 250 if (pthread_attr_init(&ptattr) || 251 pthread_attr_setdetachstate(&ptattr, PTHREAD_CREATE_DETACHED)) { 252 fprintf(stderr, "Error - Unable to initialize thread attributes: %s.\n", strerror(errno)); 253 exit(EXIT_FAILURE); 287 // make sure every cluster gets queued only once 288 for (jqelem = siggenq->head; jqelem && jqelem->data != ((hash*)t->data)->cl; jqelem = jqelem->next); 289 290 // queue cluster for signature generation 291 if (!jqelem || jqelem->data != ((hash*)t->data)->cl) { 292 // append cluster to signature generation job queue 293 if (queue_append(siggenq, (void *) (void *) ((hash*)t->data)->cl) == NULL) { 294 fprintf(stderr, "Error - Unable to queue cluster for signature generation.\n"); 295 exit(EXIT_FAILURE); 296 } 297 if (write(gpipe[1], "x", sizeof(u_char)) == -1) { 298 fprintf(stderr, "Error - Unable to write to pipe: %s.\n", strerror(errno)); 299 exit(EXIT_FAILURE); 300 } 301 } else { 302 printf("[=] Cluster is already queued for signature generation.\n"); 254 303 } 255 if (pthread_create(&ntid, NULL, pt_siggen, (void *) ((hash*)t->data)->cl)) { 256 fprintf(stderr, "Error - Cannot create signature generation thread: %s.\n", strerror(errno)); 257 exit(EXIT_FAILURE); 258 } 259 pthread_attr_destroy(&ptattr); 260 } 304 } 305 pthread_mutex_unlock(&siggenq_mutex); 261 306 262 307 return(1); nebula/trunk/src/classify.h
r1558 r1618 28 28 #include "session.h" 29 29 30 void *pt_pollcq(void *param); 30 31 void *pt_classify(void *s); 31 32 int classify(submission *s); nebula/trunk/src/nebula.c
r1614 r1618 59 59 "\t\t -d\t\t daemonize\n" 60 60 "\t\t -e\t\t minimum substring entropy\n" 61 "\t\t -g\t\t number of signature generation threads\n" 61 62 "\t\t -h\t\t this help\n" 62 63 "\t\t -E <size>\t cluster element queue size\n" … … 77 78 78 79 int main(int argc, char *argv[]) { 79 int i, qsize, port, listen_fd, rv ;80 int i, qsize, port, listen_fd, rv, gthread_no; 80 81 u_char daemonize; 81 82 char option; … … 96 97 aconnq = NULL; 97 98 99 classifyq = NULL; // classification job queue 100 siggenq = NULL; // signature generation job queue 101 98 102 i = 0; 99 103 qsize = 0; … … 105 109 rules_file = NULL; // a: NULL 106 110 clusterq_max = 5000; // C 107 cluster_radius = 95.0; // c: 95% similarity as cluster criterion111 sim_threshold = 70.0; // c: 70% similarity as cluster criterion 108 112 daemonize = 0; // d: 0 109 113 clusterhashq_max = 500000; // E 114 gthread_no = 1; // g 110 115 snort_pid = 0; // h 111 116 outlierq_max = 500000; // O … … 124 129 pthread_rwlock_init(&sighash_trie_lock, NULL) || 125 130 pthread_rwlock_init(&sidlock, NULL) || 126 pthread_mutex_init(&sessions_mutex, NULL) || 131 pthread_mutex_init(&classifyq_mutex, NULL) || 132 pthread_mutex_init(&siggenq_mutex, NULL) || 127 133 pthread_mutex_init(&siggen_mutex, NULL) || 128 134 pthread_mutex_init(&sigwrite_mutex, NULL)) { … … 134 140 135 141 // process args 136 while((option = getopt(argc, argv, "a:c:C:de:E: hi:l:O:p:r:s:t:v?")) > 0) {142 while((option = getopt(argc, argv, "a:c:C:de:E:g:hi:l:O:p:r:s:t:v?")) > 0) { 137 143 switch(option) { 138 144 case 'a': … … 140 146 break; 141 147 case 'c': 142 cluster_radius= atof(optarg);143 if (( cluster_radius < 0) || (cluster_radius> 100)) {148 sim_threshold = atof(optarg); 149 if ((sim_threshold < 0) || (sim_threshold > 100)) { 144 150 fprintf(stderr, "Error - Cluster radius must be a value between 0 and 100 (in percent).\n"); 145 151 exit(EXIT_FAILURE); … … 170 176 } 171 177 break; 178 case 'g': 179 gthread_no = atoi(optarg); 180 if (gthread_no < 1) { 181 fprintf(stderr, "Error - Need at least one signature generation thread.\n"); 182 exit(EXIT_FAILURE); 183 } 184 break; 172 185 case 'i': 173 186 global_sid = strtoul(optarg, NULL, 10); … … 228 241 // set up signal handling 229 242 set_signal_handlers(); 243 244 245 // spawn classification thread 246 // the main thread writes to a pipe to inform the classification thread about pending jobs 247 248 if (pipe(cpipe) == -1) { 249 fprintf(stderr, "Error - Unable to create interthread communication pipe: %s.\n", strerror(errno)); 250 exit(EXIT_FAILURE); 251 } 252 253 if (pthread_attr_init(&ptattr) || pthread_attr_setdetachstate(&ptattr, PTHREAD_CREATE_DETACHED)) { 254 fprintf(stderr, "Error - Unable to initialize thread attributes: %s.\n", strerror(errno)); 255 exit(EXIT_FAILURE); 256 } 257 if (pthread_create(&ntid, &ptattr, pt_pollcq, NULL)) { 258 fprintf(stderr, "Error - Cannot create clustering thread: %s.\n", strerror(errno)); 259 exit(EXIT_FAILURE); 260 } 261 pthread_attr_destroy(&ptattr); 262 263 264 265 // spawn signature generation thread(s) 266 // the main thread writes to a pipe to inform the generation thread(s) about pending jobs 267 268 for (; gthread_no; gthread_no--) { 269 if (verbose > 1) printf(" Signature generation thread spawned.\n"); 270 if (pipe(gpipe) == -1) { 271 fprintf(stderr, "Error - Unable to create interthread communication pipe: %s.\n", strerror(errno)); 272 exit(EXIT_FAILURE); 273 } 274 if (pthread_attr_init(&ptattr) || pthread_attr_setdetachstate(&ptattr, PTHREAD_CREATE_DETACHED)) { 275 fprintf(stderr, "Error - Unable to initialize thread attributes: %s.\n", strerror(errno)); 276 exit(EXIT_FAILURE); 277 } 278 if (pthread_create(&ntid, &ptattr, pt_pollgq, (void *) gthread_no)) { 279 fprintf(stderr, "Error - Cannot create signature generation thread: %s.\n", strerror(errno)); 280 exit(EXIT_FAILURE); 281 } 282 pthread_attr_destroy(&ptattr); 283 } 284 if (verbose > 1) putchar('\n'); 230 285 231 286 … … 255 310 256 311 // initialize queues 257 outlierq = queue_new(); 258 clusterq = queue_new(); 312 outlierq = queue_new(); 313 clusterq = queue_new(); 314 classifyq = queue_new(); 315 siggenq = queue_new(); 259 316 260 317 … … 274 331 printf(" Initial snort signature ID: %u\n", global_sid); 275 332 printf(" Initial cluster size threshold: %lu\n", initial_threshold); 276 printf(" Cluster criterion (minimal similarity): %.1f percent\n", cluster_radius);333 printf(" Cluster criterion (minimal similarity): %.1f percent\n", sim_threshold); 277 334 printf(" Accepting submissions on port %u/tcp.\n", port); 278 335 } 279 p rintf("\n");336 putchar('\n'); 280 337 281 338 … … 312 369 if ((LISTEN_SOCK.revents & POLLIN) && pollfd_set_size < POLLFD_MAX_SET_SIZE) { 313 370 // incoming connection, accept it and place fd in pollfd set 314 pthread_mutex_lock(&sessions_mutex);315 316 371 pollfd_set_size++; 317 372 … … 320 375 321 376 memset(&s[pollfd_set_size], 0, sizeof(submission)); 322 323 pthread_mutex_unlock(&sessions_mutex);324 377 325 378 if (verbose > 1) printf("[>] Connection accepted: %d.\n", pfdset[pollfd_set_size].fd); … … 356 409 memset(&s[i], 0, sizeof(submission)); 357 410 358 if (pthread_attr_init(&ptattr) || 359 pthread_attr_setdetachstate(&ptattr, PTHREAD_CREATE_DETACHED)) { 360 fprintf(stderr, "Error - Unable to initialize thread attributes: %s.\n", strerror(errno)); 411 // add session to classify queue 412 // and write a byte to the itc thread to wakeup the classification thread 413 pthread_mutex_lock(&classifyq_mutex); 414 if (queue_append(classifyq, (void *) tmp_submission) == NULL) { 415 fprintf(stderr, "Error - Unable to queue session for classification.\n"); 416 exit(EXIT_FAILURE); 417 } 418 if (write(cpipe[1], "x", sizeof(u_char)) == -1) { 419 fprintf(stderr, "Error - Unable to write to pipe: %s.\n", strerror(errno)); 361 420 exit(EXIT_FAILURE); 362 421 } 363 pthread_mutex_lock(&sessions_mutex); 364 if (pthread_create(&ntid, &ptattr, pt_classify, (void *) tmp_submission)) { 365 fprintf(stderr, "Error - Cannot create clustering thread: %s.\n", strerror(errno)); 366 exit(EXIT_FAILURE); 367 } 368 pthread_attr_destroy(&ptattr); 422 pthread_mutex_unlock(&classifyq_mutex); 369 423 370 424 session_reset(&s[i], i); nebula/trunk/src/nebula.h
r1614 r1618 39 39 #define LISTEN_SOCK pfdset[0] 40 40 41 submission s[POLLFD_MAX_SET_SIZE+1];42 struct pollfd pfdset[POLLFD_MAX_SET_SIZE+1];43 nfds_t pollfd_set_size; // size of dynamically adjusted set41 submission s[POLLFD_MAX_SET_SIZE+1]; 42 struct pollfd pfdset[POLLFD_MAX_SET_SIZE+1]; 43 nfds_t pollfd_set_size; // size of dynamically adjusted set 44 44 45 int lock_mutex;45 int lock_mutex, cpipe[2], gpipe[2]; 46 46 47 u_char verbose;48 char *secret;49 trie_node spamsum_trie, md5sum_trie, sighash_trie;47 u_char verbose; 48 char *secret; 49 trie_node spamsum_trie, md5sum_trie, sighash_trie; 50 50 51 ssize_t clusterq_max, clusterhashq_max, outlierq_max;52 double cluster_radius;51 ssize_t clusterq_max, clusterhashq_max, outlierq_max; 52 double sim_threshold; 53 53 54 unsigned long intinitial_threshold;54 unsigned long initial_threshold; 55 55 56 ssize_t min_sstr_len;57 double min_sstr_ent;56 ssize_t min_sstr_len; 57 double min_sstr_ent; 58 58 59 queue *clusterq;60 queue *outlierq;61 queue *aconnq;59 queue *clusterq; 60 queue *outlierq; 61 queue *aconnq; 62 62 63 pthread_mutex_t sessions_mutex; 63 queue *classifyq; // clustering job queue 64 queue *siggenq; // signature generation job queue 65 66 67 pthread_mutex_t classifyq_mutex; 68 pthread_mutex_t siggenq_mutex; 69 64 70 pthread_rwlock_t md5sum_trie_lock; 65 71 pthread_rwlock_t spamsum_trie_lock; nebula/trunk/src/queue.h
r1585 r1618 49 49 queue *queue_new(void); 50 50 void queue_free(queue *q, void(*cbfn)(void *data)); 51 inline qelem *queue_append(queue *q, void *data); 52 inline qelem *queue_cuthead(queue *q); 51 53 qelem *queue_ins(queue *q, void *data, ssize_t max_size); 52 54 inline qelem *queue_cuttail(queue *q); nebula/trunk/src/session.c
r1612 r1618 38 38 if (!subm) return; 39 39 40 pthread_mutex_lock(&sessions_mutex);41 42 40 free(subm->cattack); 43 41 free(subm->attack); … … 59 57 pollfd_set_size--; 60 58 } 61 62 pthread_mutex_unlock(&sessions_mutex);63 59 64 60 fflush(stdout); nebula/trunk/src/sig.c
r1614 r1618 25 25 #include <stdlib.h> 26 26 #include <string.h> 27 #include <sys/select.h> 27 28 #include <sys/types.h> 29 #include <unistd.h> 28 30 29 31 #include "cluster.h" … … 354 356 355 357 356 void *pt_siggen(void *cl) { 358 void *pt_pollgq(void *param) { 359 qelem *elem; 360 struct timeval timeout; 361 fd_set rfds; 362 u_char buf; 363 364 for (;;) { 365 FD_ZERO(&rfds); 366 FD_SET(gpipe[0], &rfds); 367 368 timeout.tv_sec = 10; 369 timeout.tv_usec = 0; 370 371 switch (select(gpipe[0]+1, &rfds, NULL, NULL, &timeout)) { 372 case -1: 373 fprintf(stderr, "Error - select() failed: %s.\n", strerror(errno)); 374 exit(EXIT_FAILURE); 375 case 0: 376 break; 377 default: 378 if (FD_ISSET(gpipe[0], &rfds)) { 379 if (read(gpipe[0], &buf, 1) != 1) { 380 fprintf(stderr, "Error - Unable to read from pipe: %s.\n", strerror(errno)); 381 exit(EXIT_FAILURE); 382 } 383 } 384 pthread_mutex_lock(&siggenq_mutex); 385 elem = queue_cuthead(siggenq); 386 pthread_mutex_unlock(&siggenq_mutex); 387 388 if (elem != NULL) { 389 gensig(elem->data); 390 free(elem); 391 } 392 393 break; 394 } 395 } 396 397 return((void *) 1); 398 } 399 400 401 void *gensig(void *cl) { 357 402 cluster *attacks = cl; 358 403 lchar *content; nebula/trunk/src/sig.h
r1611 r1618 90 90 } sseg; 91 91 92 93 void * pt_siggen(void *cl);92 void *pt_pollgq(void *param); 93 void *gensig(void *cl); 94 94 void build_sig(signature *sig, stree *t, lcatbl *lca_table, stnode **leaves, ssize_t num_leaves, ssize_t min_len, double min_ent); 95 95 int append_snortsig_to_rulefile(const char *filename, const stree *t, const signature *sig, const sseg *seglist, const ssize_t num_frags); nebula/trunk/src/signals.c
r1613 r1618 52 52 } 53 53 if (sig == SIGINCC) { 54 if ( cluster_radius== 100.0) return;55 if ( cluster_radius< 95.0) {56 cluster_radius+= 5.0;54 if (sim_threshold == 100.0) return; 55 if (sim_threshold < 95.0) { 56 sim_threshold += 5.0; 57 57 write(STDOUT_FILENO, "[*] Cluster criterion increased by 5 percent\n", 45); 58 58 } else { 59 cluster_radius= 100.0;59 sim_threshold = 100.0; 60 60 write(STDOUT_FILENO, "[*] Cluster criterion increased to 100 percent\n", 46); 61 61 } … … 63 63 } 64 64 if (sig == SIGDECC) { 65 if ( cluster_radius== 0.0) return;66 if ( cluster_radius> 5.0) {67 cluster_radius-= 5.0;65 if (sim_threshold == 0.0) return; 66 if (sim_threshold > 5.0) { 67 sim_threshold -= 5.0; 68 68 write(STDOUT_FILENO, "[*] Cluster criterion decreased by 5 percent\n", 45); 69 69 } else { 70 cluster_radius= 0.0;70 sim_threshold = 0.0; 71 71 write(STDOUT_FILENO, "[*] Cluster criterion decreased to 0 percent\n", 45); 72 72 } nebula/trunk/src/util.c
r1612 r1618 114 114 for (i = 0; i <= POLLFD_MAX_SET_SIZE; i++) shutdown(pfdset[i].fd, SHUT_RDWR); 115 115 116 // free data structures 116 // free job queues 117 queue_free(siggenq, NULL); 118 queue_free(classifyq, NULL); 119 120 // free hash queues 121 queue_free(clusterq, cluster_free); 117 122 queue_free(outlierq, hash_free); 118 queue_free(clusterq, cluster_free);119 123 124 // free tries 120 125 trie_delete(md5sum_trie.childlist, md5sum_trie.childlist_len, NULL); 121 122 126 trie_delete(spamsum_trie.childlist, spamsum_trie.childlist_len, NULL); 123 124 127 trie_delete(sighash_trie.childlist, sighash_trie.childlist_len, NULL); 125 128 126 pthread_mutex_destroy(&sessions_mutex);127 129 pthread_mutex_destroy(&siggen_mutex); 128 130 pthread_mutex_destroy(&sigwrite_mutex);
