mirror of
				https://github.com/simon987/sist2.git
				synced 2025-10-31 07:56:53 +00:00 
			
		
		
		
	do not throttle writer/index thread pools
This commit is contained in:
		
							parent
							
								
									f3674ffa02
								
							
						
					
					
						commit
						6075c21a3a
					
				| @ -257,8 +257,8 @@ int scan_args_validate(scan_args_t *args, int argc, const char **argv) { | |||||||
|         args->max_memory_buffer = DEFAULT_MAX_MEM_BUFFER; |         args->max_memory_buffer = DEFAULT_MAX_MEM_BUFFER; | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     if (args->throttle_memory_threshold <= 0) { |     if (args->scan_mem_limit <= 0) { | ||||||
|         args->throttle_memory_threshold = DEFAULT_THROTTLE_MEMORY_THRESHOLD; |         args->scan_mem_limit = DEFAULT_THROTTLE_MEMORY_THRESHOLD; | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     if (args->list_path != NULL) { |     if (args->list_path != NULL) { | ||||||
|  | |||||||
| @ -10,7 +10,7 @@ typedef struct scan_args { | |||||||
|     int size; |     int size; | ||||||
|     int content_size; |     int content_size; | ||||||
|     int threads; |     int threads; | ||||||
|     int throttle_memory_threshold; |     int scan_mem_limit; | ||||||
|     char *incremental; |     char *incremental; | ||||||
|     char *output; |     char *output; | ||||||
|     char *rewrite_url; |     char *rewrite_url; | ||||||
|  | |||||||
							
								
								
									
										10
									
								
								src/main.c
									
									
									
									
									
								
							
							
						
						
									
										10
									
								
								src/main.c
									
									
									
									
									
								
							| @ -253,7 +253,7 @@ void initialize_scan_context(scan_args_t *args) { | |||||||
| 
 | 
 | ||||||
|     ScanCtx.threads = args->threads; |     ScanCtx.threads = args->threads; | ||||||
|     ScanCtx.depth = args->depth; |     ScanCtx.depth = args->depth; | ||||||
|     ScanCtx.mem_limit = args->throttle_memory_threshold * 1024 * 1024; |     ScanCtx.mem_limit = args->scan_mem_limit * 1024 * 1024; | ||||||
| 
 | 
 | ||||||
|     strncpy(ScanCtx.index.path, args->output, sizeof(ScanCtx.index.path)); |     strncpy(ScanCtx.index.path, args->output, sizeof(ScanCtx.index.path)); | ||||||
|     strncpy(ScanCtx.index.desc.name, args->name, sizeof(ScanCtx.index.desc.name)); |     strncpy(ScanCtx.index.desc.name, args->name, sizeof(ScanCtx.index.desc.name)); | ||||||
| @ -383,10 +383,10 @@ void sist2_scan(scan_args_t *args) { | |||||||
|         load_incremental_index(args); |         load_incremental_index(args); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     ScanCtx.pool = tpool_create(args->threads, thread_cleanup, TRUE, TRUE); |     ScanCtx.pool = tpool_create(ScanCtx.threads, thread_cleanup, TRUE, TRUE, ScanCtx.mem_limit); | ||||||
|     tpool_start(ScanCtx.pool); |     tpool_start(ScanCtx.pool); | ||||||
| 
 | 
 | ||||||
|     ScanCtx.writer_pool = tpool_create(1, writer_cleanup, TRUE, FALSE); |     ScanCtx.writer_pool = tpool_create(1, writer_cleanup, TRUE, FALSE, 0); | ||||||
|     tpool_start(ScanCtx.writer_pool); |     tpool_start(ScanCtx.writer_pool); | ||||||
| 
 | 
 | ||||||
|     if (args->list_path) { |     if (args->list_path) { | ||||||
| @ -467,7 +467,7 @@ void sist2_index(index_args_t *args) { | |||||||
|         f = index_json; |         f = index_json; | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     IndexCtx.pool = tpool_create(args->threads, elastic_cleanup, FALSE, args->print == 0); |     IndexCtx.pool = tpool_create(args->threads, elastic_cleanup, FALSE, args->print == 0, 0); | ||||||
|     tpool_start(IndexCtx.pool); |     tpool_start(IndexCtx.pool); | ||||||
| 
 | 
 | ||||||
|     READ_INDICES(file_path, args->index_path, { |     READ_INDICES(file_path, args->index_path, { | ||||||
| @ -587,7 +587,7 @@ int main(int argc, const char *argv[]) { | |||||||
| 
 | 
 | ||||||
|             OPT_GROUP("Scan options"), |             OPT_GROUP("Scan options"), | ||||||
|             OPT_INTEGER('t', "threads", &common_threads, "Number of threads. DEFAULT=1"), |             OPT_INTEGER('t', "threads", &common_threads, "Number of threads. DEFAULT=1"), | ||||||
|             OPT_STRING(0, "mem-throttle", &scan_args->throttle_memory_threshold, "Total memory threshold in MB for scan throttling. DEFAULT=0"), |             OPT_STRING(0, "mem-throttle", &scan_args->scan_mem_limit, "Total memory threshold in MB for scan throttling. DEFAULT=0"), | ||||||
|             OPT_FLOAT('q', "quality", &scan_args->quality, |             OPT_FLOAT('q', "quality", &scan_args->quality, | ||||||
|                       "Thumbnail quality, on a scale of 1.0 to 31.0, 1.0 being the best. DEFAULT=3"), |                       "Thumbnail quality, on a scale of 1.0 to 31.0, 1.0 being the best. DEFAULT=3"), | ||||||
|             OPT_INTEGER(0, "size", &scan_args->size, |             OPT_INTEGER(0, "size", &scan_args->size, | ||||||
|  | |||||||
| @ -29,6 +29,7 @@ typedef struct tpool { | |||||||
|     int done_cnt; |     int done_cnt; | ||||||
|     int busy_cnt; |     int busy_cnt; | ||||||
|     int throttle_stuck_cnt; |     int throttle_stuck_cnt; | ||||||
|  |     size_t mem_limit; | ||||||
| 
 | 
 | ||||||
|     int free_arg; |     int free_arg; | ||||||
|     int stop; |     int stop; | ||||||
| @ -171,7 +172,7 @@ static void *tpool_worker(void *arg) { | |||||||
| 
 | 
 | ||||||
|         if (work != NULL) { |         if (work != NULL) { | ||||||
|             stuck_notified = 0; |             stuck_notified = 0; | ||||||
|             while(!pool->stop && ScanCtx.mem_limit > 0 && _get_total_mem() >= ScanCtx.mem_limit) { |             while(!pool->stop && pool->mem_limit > 0 && _get_total_mem() >= pool->mem_limit) { | ||||||
|                 if (!stuck_notified && throttle_ms >= 90000) { |                 if (!stuck_notified && throttle_ms >= 90000) { | ||||||
|                     // notify the pool that this thread is stuck.
 |                     // notify the pool that this thread is stuck.
 | ||||||
|                     pthread_mutex_lock(&(pool->work_mutex)); |                     pthread_mutex_lock(&(pool->work_mutex)); | ||||||
| @ -299,7 +300,7 @@ void tpool_destroy(tpool_t *pool) { | |||||||
|  * Create a thread pool |  * Create a thread pool | ||||||
|  * @param thread_cnt Worker threads count |  * @param thread_cnt Worker threads count | ||||||
|  */ |  */ | ||||||
| tpool_t *tpool_create(int thread_cnt, void cleanup_func(), int free_arg, int print_progress) { | tpool_t *tpool_create(int thread_cnt, void cleanup_func(), int free_arg, int print_progress, size_t mem_limit) { | ||||||
| 
 | 
 | ||||||
|     tpool_t *pool = malloc(sizeof(tpool_t)); |     tpool_t *pool = malloc(sizeof(tpool_t)); | ||||||
|     pool->thread_cnt = thread_cnt; |     pool->thread_cnt = thread_cnt; | ||||||
| @ -307,6 +308,7 @@ tpool_t *tpool_create(int thread_cnt, void cleanup_func(), int free_arg, int pri | |||||||
|     pool->done_cnt = 0; |     pool->done_cnt = 0; | ||||||
|     pool->busy_cnt = 0; |     pool->busy_cnt = 0; | ||||||
|     pool->throttle_stuck_cnt = 0; |     pool->throttle_stuck_cnt = 0; | ||||||
|  |     pool->mem_limit = mem_limit; | ||||||
|     pool->stop = FALSE; |     pool->stop = FALSE; | ||||||
|     pool->free_arg = free_arg; |     pool->free_arg = free_arg; | ||||||
|     pool->cleanup_func = cleanup_func; |     pool->cleanup_func = cleanup_func; | ||||||
|  | |||||||
| @ -8,7 +8,7 @@ typedef struct tpool tpool_t; | |||||||
| 
 | 
 | ||||||
| typedef void (*thread_func_t)(void *arg); | typedef void (*thread_func_t)(void *arg); | ||||||
| 
 | 
 | ||||||
| tpool_t *tpool_create(int num, void (*cleanup_func)(), int free_arg, int print_progress); | tpool_t *tpool_create(int num, void (*cleanup_func)(), int free_arg, int print_progress, size_t mem_limit); | ||||||
| void tpool_start(tpool_t *pool); | void tpool_start(tpool_t *pool); | ||||||
| void tpool_destroy(tpool_t *pool); | void tpool_destroy(tpool_t *pool); | ||||||
| 
 | 
 | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user