limit max workers

This commit is contained in:
Cyberes 2023-07-20 13:06:19 -06:00
parent 4e9d3265fd
commit 627f4d2069
6 changed files with 22 additions and 21 deletions

View File

@ -15,13 +15,6 @@ var WorkerBufferSize int
var PrintNew bool var PrintNew bool
var RootDir string var RootDir string
var CrawlerParseMIME bool var CrawlerParseMIME bool
var MaxWorkers int
var itemPool = &sync.Pool{
New: func() interface{} {
return &data.Item{}
},
}
type DirectoryCrawler struct { type DirectoryCrawler struct {
cache *lru.Cache[string, *data.Item] cache *lru.Cache[string, *data.Item]

2
src/cache/Worker.go vendored
View File

@ -32,6 +32,8 @@ func (w *Worker) start(dc *DirectoryCrawler) {
dc.cache.Add(StripRootDir(path, RootDir), NewItem(path, info)) dc.cache.Add(StripRootDir(path, RootDir), NewItem(path, info))
} }
w.active = false w.active = false
// Release the token back to the semaphore when the worker is done
<-WorkerSemaphore
}() }()
} }

View File

@ -2,6 +2,8 @@ package cache
import "sync" import "sync"
var WorkerSemaphore chan struct{}
type WorkerPool struct { type WorkerPool struct {
pool chan *Worker pool chan *Worker
wg sync.WaitGroup wg sync.WaitGroup
@ -9,7 +11,7 @@ type WorkerPool struct {
func NewWorkerPool() *WorkerPool { func NewWorkerPool() *WorkerPool {
return &WorkerPool{ return &WorkerPool{
pool: make(chan *Worker, MaxWorkers), pool: make(chan *Worker, cap(WorkerSemaphore)), // use the capacity of the semaphore as the size of the pool
} }
} }
@ -18,6 +20,8 @@ func (p *WorkerPool) Get() *Worker {
case w := <-p.pool: case w := <-p.pool:
return w return w
default: default:
// Acquire a token from the semaphore
WorkerSemaphore <- struct{}{}
return newWorker(len(p.pool)) return newWorker(len(p.pool))
} }
} }
@ -26,7 +30,8 @@ func (p *WorkerPool) Put(w *Worker) {
select { select {
case p.pool <- w: case p.pool <- w:
default: default:
// If the pool is full, discard the worker // If the pool is full, discard the worker and release the token back to the semaphore
<-WorkerSemaphore
} }
} }

View File

@ -10,6 +10,12 @@ import (
"time" "time"
) )
var itemPool = &sync.Pool{
New: func() interface{} {
return &data.Item{}
},
}
func StartCrawler(basePath string, sharedCache *lru.Cache[string, *data.Item], cfg *config.Config) error { func StartCrawler(basePath string, sharedCache *lru.Cache[string, *data.Item], cfg *config.Config) error {
log = logging.GetLogger() log = logging.GetLogger()
var wg sync.WaitGroup var wg sync.WaitGroup

View File

@ -12,7 +12,6 @@ type Config struct {
WatchMode string WatchMode string
CrawlModeCrawlInterval int CrawlModeCrawlInterval int
DirectoryCrawlers int DirectoryCrawlers int
CrawlWorkers int
WatchInterval int WatchInterval int
CacheSize int CacheSize int
CacheTime int CacheTime int
@ -30,7 +29,7 @@ type Config struct {
ApiSearchMaxResults int ApiSearchMaxResults int
ApiSearchShowChildren bool ApiSearchShowChildren bool
CrawlerChannelBufferSize int CrawlerChannelBufferSize int
CrawlerWorkerPoolSize int CrawlerMaxWorkers int
} }
func LoadConfig(configFile string) (*Config, error) { func LoadConfig(configFile string) (*Config, error) {
@ -40,7 +39,6 @@ func LoadConfig(configFile string) (*Config, error) {
viper.SetDefault("watch_mode", "crawl") viper.SetDefault("watch_mode", "crawl")
viper.SetDefault("crawl_mode_crawl_interval", 3600) viper.SetDefault("crawl_mode_crawl_interval", 3600)
viper.SetDefault("directory_crawlers", 4) viper.SetDefault("directory_crawlers", 4)
viper.SetDefault("crawl_workers", 10)
viper.SetDefault("cache_size", 100000000) viper.SetDefault("cache_size", 100000000)
viper.SetDefault("cache_time", 30) viper.SetDefault("cache_time", 30)
viper.SetDefault("cache_print_new", false) viper.SetDefault("cache_print_new", false)
@ -56,7 +54,7 @@ func LoadConfig(configFile string) (*Config, error) {
viper.SetDefault("api_search_show_children", false) viper.SetDefault("api_search_show_children", false)
viper.SetDefault("http_allow_during_initial_crawl", false) viper.SetDefault("http_allow_during_initial_crawl", false)
viper.SetDefault("crawler_channel_buffer_size", 1000) viper.SetDefault("crawler_channel_buffer_size", 1000)
viper.SetDefault("crawler_worker_pool_size", 200) viper.SetDefault("crawler_max_workers", 200)
err := viper.ReadInConfig() err := viper.ReadInConfig()
if err != nil { if err != nil {
@ -77,7 +75,6 @@ func LoadConfig(configFile string) (*Config, error) {
CrawlModeCrawlInterval: viper.GetInt("crawl_mode_crawl_interval"), CrawlModeCrawlInterval: viper.GetInt("crawl_mode_crawl_interval"),
WatchInterval: viper.GetInt("watch_interval"), WatchInterval: viper.GetInt("watch_interval"),
DirectoryCrawlers: viper.GetInt("crawl_mode_crawl_interval"), DirectoryCrawlers: viper.GetInt("crawl_mode_crawl_interval"),
CrawlWorkers: viper.GetInt("crawl_workers"),
CacheSize: viper.GetInt("cache_size"), CacheSize: viper.GetInt("cache_size"),
CacheTime: viper.GetInt("cache_time"), CacheTime: viper.GetInt("cache_time"),
CachePrintNew: viper.GetBool("cache_print_new"), CachePrintNew: viper.GetBool("cache_print_new"),
@ -94,7 +91,7 @@ func LoadConfig(configFile string) (*Config, error) {
ApiSearchMaxResults: viper.GetInt("api_search_max_results"), ApiSearchMaxResults: viper.GetInt("api_search_max_results"),
ApiSearchShowChildren: viper.GetBool("api_search_show_children"), ApiSearchShowChildren: viper.GetBool("api_search_show_children"),
CrawlerChannelBufferSize: viper.GetInt("crawler_channel_buffer_size"), CrawlerChannelBufferSize: viper.GetInt("crawler_channel_buffer_size"),
CrawlerWorkerPoolSize: viper.GetInt("crawler_worker_pool_size"), CrawlerMaxWorkers: viper.GetInt("crawler_worker_pool_size"),
} }
if config.WatchMode != "crawl" && config.WatchMode != "watch" { if config.WatchMode != "crawl" && config.WatchMode != "watch" {
@ -109,12 +106,8 @@ func LoadConfig(configFile string) (*Config, error) {
return nil, errors.New("crawl_mode_crawl_interval must be more than 1") return nil, errors.New("crawl_mode_crawl_interval must be more than 1")
} }
if config.CrawlWorkers < 1 {
return nil, errors.New("crawl_workers must be more than 1")
}
if config.CacheSize < 1 { if config.CacheSize < 1 {
return nil, errors.New("crawl_workers must be more than 1") return nil, errors.New("cache_size must be more than 1")
} }
if config.CacheRecacheCrawlerLimit < 1 { if config.CacheRecacheCrawlerLimit < 1 {

View File

@ -81,11 +81,13 @@ func main() {
log.Fatalf("Failed to load config file: %s", err) log.Fatalf("Failed to load config file: %s", err)
} }
// Set global constants
cache.WorkerBufferSize = cfg.CrawlerChannelBufferSize cache.WorkerBufferSize = cfg.CrawlerChannelBufferSize
cache.PrintNew = cfg.CachePrintNew cache.PrintNew = cfg.CachePrintNew
cache.RootDir = cfg.RootDir cache.RootDir = cfg.RootDir
cache.CrawlerParseMIME = cfg.CrawlerParseMIME cache.CrawlerParseMIME = cfg.CrawlerParseMIME
cache.MaxWorkers = cfg.CrawlWorkers //cache.MaxWorkers = cfg.CrawlWorkers
cache.WorkerSemaphore = make(chan struct{}, cfg.CrawlerMaxWorkers)
sharedCache, err := lru.New[string, *data.Item](cfg.CacheSize) sharedCache, err := lru.New[string, *data.Item](cfg.CacheSize)
if err != nil { if err != nil {