mirror of
				https://github.com/terorie/od-database-crawler.git
				synced 2025-10-25 19:26:52 +00:00 
			
		
		
		
	Add resource stats logging
This commit is contained in:
		
							parent
							
								
									395a6f30b2
								
							
						
					
					
						commit
						add6581804
					
				
							
								
								
									
										13
									
								
								config.go
									
									
									
									
									
								
							
							
						
						
									
										13
									
								
								config.go
									
									
									
									
									
								
							| @ -14,7 +14,8 @@ var config struct { | |||||||
| 	Retries    int | 	Retries    int | ||||||
| 	Workers    int | 	Workers    int | ||||||
| 	Tasks      int32 | 	Tasks      int32 | ||||||
| 	StatsInterval time.Duration | 	CrawlStats time.Duration | ||||||
|  | 	AllocStats time.Duration | ||||||
| 	Verbose    bool | 	Verbose    bool | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| @ -24,7 +25,8 @@ const ( | |||||||
| 	ConfTasks      = "crawl.tasks" | 	ConfTasks      = "crawl.tasks" | ||||||
| 	ConfRetries    = "crawl.retries" | 	ConfRetries    = "crawl.retries" | ||||||
| 	ConfWorkers    = "crawl.connections" | 	ConfWorkers    = "crawl.connections" | ||||||
| 	ConfStatsInterval = "output.stats_interval" | 	ConfCrawlStats = "output.crawl_stats" | ||||||
|  | 	ConfAllocStats = "output.resource_stats" | ||||||
| 	ConfVerbose    = "output.verbose" | 	ConfVerbose    = "output.verbose" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| @ -32,7 +34,8 @@ func prepareConfig() { | |||||||
| 	viper.SetDefault(ConfRetries, 5) | 	viper.SetDefault(ConfRetries, 5) | ||||||
| 	viper.SetDefault(ConfWorkers, 2) | 	viper.SetDefault(ConfWorkers, 2) | ||||||
| 	viper.SetDefault(ConfTasks, 3) | 	viper.SetDefault(ConfTasks, 3) | ||||||
| 	viper.SetDefault(ConfStatsInterval, 3 * time.Second) | 	viper.SetDefault(ConfCrawlStats, 3 * time.Second) | ||||||
|  | 	viper.SetDefault(ConfAllocStats, 0) | ||||||
| 	viper.SetDefault(ConfVerbose, false) | 	viper.SetDefault(ConfVerbose, false) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| @ -70,7 +73,9 @@ func readConfig() { | |||||||
| 		configOOB(ConfTasks, int(config.Tasks)) | 		configOOB(ConfTasks, int(config.Tasks)) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	config.StatsInterval = viper.GetDuration(ConfStatsInterval) | 	config.CrawlStats = viper.GetDuration(ConfCrawlStats) | ||||||
|  | 
 | ||||||
|  | 	config.AllocStats = viper.GetDuration(ConfAllocStats) | ||||||
| 
 | 
 | ||||||
| 	config.Verbose = viper.GetBool(ConfVerbose) | 	config.Verbose = viper.GetBool(ConfVerbose) | ||||||
| 	if config.Verbose { | 	if config.Verbose { | ||||||
|  | |||||||
| @ -7,8 +7,10 @@ server: | |||||||
| 
 | 
 | ||||||
| # Log output settings | # Log output settings | ||||||
| output: | output: | ||||||
|   # Statistics printing interval |   # Crawl statistics | ||||||
|   stats_interval: 1s |   crawl_stats: 1s | ||||||
|  |   # CPU/RAM/Job queue stats | ||||||
|  |   resource_stats: 1s | ||||||
|   # More output? (Every listed dir) |   # More output? (Every listed dir) | ||||||
|   verbose: false |   verbose: false | ||||||
| 
 | 
 | ||||||
|  | |||||||
| @ -7,6 +7,7 @@ import ( | |||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| var activeTasks int32 | var activeTasks int32 | ||||||
|  | var totalBuffered int64 | ||||||
| 
 | 
 | ||||||
| func Schedule(c context.Context, remotes <-chan *OD) { | func Schedule(c context.Context, remotes <-chan *OD) { | ||||||
| 	go Stats(c) | 	go Stats(c) | ||||||
| @ -75,6 +76,7 @@ func bufferJobs(c context.Context, in chan Job, out chan Job) { | |||||||
| 				if !ok { | 				if !ok { | ||||||
| 					in = nil | 					in = nil | ||||||
| 				} else { | 				} else { | ||||||
|  | 					atomic.AddInt64(&totalBuffered, 1) | ||||||
| 					inQueue = append(inQueue, v) | 					inQueue = append(inQueue, v) | ||||||
| 				} | 				} | ||||||
| 			case <-c.Done(): | 			case <-c.Done(): | ||||||
| @ -86,9 +88,11 @@ func bufferJobs(c context.Context, in chan Job, out chan Job) { | |||||||
| 				if !ok { | 				if !ok { | ||||||
| 					in = nil | 					in = nil | ||||||
| 				} else { | 				} else { | ||||||
|  | 					atomic.AddInt64(&totalBuffered, 1) | ||||||
| 					inQueue = append(inQueue, v) | 					inQueue = append(inQueue, v) | ||||||
| 				} | 				} | ||||||
| 			case outCh() <- inQueue[0]: | 			case outCh() <- inQueue[0]: | ||||||
|  | 				atomic.AddInt64(&totalBuffered, -1) | ||||||
| 				inQueue = inQueue[1:] | 				inQueue = inQueue[1:] | ||||||
| 			case <-c.Done(): | 			case <-c.Done(): | ||||||
| 				return | 				return | ||||||
|  | |||||||
							
								
								
									
										29
									
								
								stats.go
									
									
									
									
									
								
							
							
						
						
									
										29
									
								
								stats.go
									
									
									
									
									
								
							| @ -4,6 +4,7 @@ import ( | |||||||
| 	"context" | 	"context" | ||||||
| 	"github.com/sirupsen/logrus" | 	"github.com/sirupsen/logrus" | ||||||
| 	"math" | 	"math" | ||||||
|  | 	"runtime" | ||||||
| 	"sync/atomic" | 	"sync/atomic" | ||||||
| 	"time" | 	"time" | ||||||
| ) | ) | ||||||
| @ -15,14 +16,23 @@ var totalAborted uint64 | |||||||
| 
 | 
 | ||||||
| func Stats(c context.Context) { | func Stats(c context.Context) { | ||||||
| 	var startedLast uint64 = 0 | 	var startedLast uint64 = 0 | ||||||
| 	ticker := time.NewTicker(config.StatsInterval).C | 	var crawlTicker <-chan time.Time | ||||||
|  | 	var allocTicker <-chan time.Time | ||||||
|  | 
 | ||||||
|  | 	if config.CrawlStats != 0 { | ||||||
|  | 		crawlTicker = time.NewTicker(config.CrawlStats).C | ||||||
|  | 	} | ||||||
|  | 	if config.AllocStats != 0 { | ||||||
|  | 		allocTicker = time.NewTicker(config.AllocStats).C | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| 	for { | 	for { | ||||||
| 		select { | 		select { | ||||||
| 		case <-ticker: | 		case <-crawlTicker: | ||||||
| 			startedNow := atomic.LoadUint64(&totalStarted) | 			startedNow := atomic.LoadUint64(&totalStarted) | ||||||
| 
 | 
 | ||||||
| 			perSecond := float64(startedNow - startedLast) / | 			perSecond := float64(startedNow - startedLast) / | ||||||
| 				config.StatsInterval.Seconds() | 				config.CrawlStats.Seconds() | ||||||
| 
 | 
 | ||||||
| 			// Round to .5 | 			// Round to .5 | ||||||
| 			perSecond *= 2 | 			perSecond *= 2 | ||||||
| @ -34,10 +44,21 @@ func Stats(c context.Context) { | |||||||
| 				"done":    atomic.LoadUint64(&totalDone), | 				"done":    atomic.LoadUint64(&totalDone), | ||||||
| 				"retries": atomic.LoadUint64(&totalRetries), | 				"retries": atomic.LoadUint64(&totalRetries), | ||||||
| 				"aborted": atomic.LoadUint64(&totalAborted), | 				"aborted": atomic.LoadUint64(&totalAborted), | ||||||
| 			}).Info("Stats") | 			}).Info("Crawl Stats") | ||||||
| 
 | 
 | ||||||
| 			startedLast = startedNow | 			startedLast = startedNow | ||||||
| 
 | 
 | ||||||
|  | 		case <-allocTicker: | ||||||
|  | 			var mem runtime.MemStats | ||||||
|  | 			runtime.ReadMemStats(&mem) | ||||||
|  | 
 | ||||||
|  | 			logrus.WithFields(logrus.Fields{ | ||||||
|  | 				"queue_count": totalBuffered, | ||||||
|  | 				"heap": FormatByteCount(mem.Alloc), | ||||||
|  | 				"objects": mem.HeapObjects, | ||||||
|  | 				"num_gc": mem.NumGC, | ||||||
|  | 			}).Info("Resource Stats") | ||||||
|  | 
 | ||||||
| 		case <-c.Done(): | 		case <-c.Done(): | ||||||
| 			return | 			return | ||||||
| 		} | 		} | ||||||
|  | |||||||
							
								
								
									
										18
									
								
								util.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										18
									
								
								util.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,18 @@ | |||||||
|  | package main | ||||||
|  | 
 | ||||||
|  | import "fmt" | ||||||
|  | 
 | ||||||
|  | // https://programming.guide/go/formatting-byte-size-to-human-readable-format.html | ||||||
|  | func FormatByteCount(b uint64) string { | ||||||
|  | 	const unit = 1024 | ||||||
|  | 	if b < unit { | ||||||
|  | 		return fmt.Sprintf("%d B", b) | ||||||
|  | 	} else { | ||||||
|  | 		div, exp := int64(unit), 0 | ||||||
|  | 		for n := b / unit; n >= unit; n /= unit { | ||||||
|  | 			div *= unit | ||||||
|  | 			exp++ | ||||||
|  | 		} | ||||||
|  | 		return fmt.Sprintf("%.1f %ciB", float64(b)/float64(div), "KMGTPE"[exp]) | ||||||
|  | 	} | ||||||
|  | } | ||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user