diff --git a/src/cache/DirectoryCrawler.go b/src/cache/DirectoryCrawler.go index c759382..ddef3f0 100644 --- a/src/cache/DirectoryCrawler.go +++ b/src/cache/DirectoryCrawler.go @@ -15,13 +15,6 @@ var WorkerBufferSize int var PrintNew bool var RootDir string var CrawlerParseMIME bool -var MaxWorkers int - -var itemPool = &sync.Pool{ - New: func() interface{} { - return &data.Item{} - }, -} type DirectoryCrawler struct { cache *lru.Cache[string, *data.Item] diff --git a/src/cache/Worker.go b/src/cache/Worker.go index c83523b..48e9303 100644 --- a/src/cache/Worker.go +++ b/src/cache/Worker.go @@ -32,6 +32,8 @@ func (w *Worker) start(dc *DirectoryCrawler) { dc.cache.Add(StripRootDir(path, RootDir), NewItem(path, info)) } w.active = false + // Release the token back to the semaphore when the worker is done + <-WorkerSemaphore }() } diff --git a/src/cache/WorkerPool.go b/src/cache/WorkerPool.go index 3658a32..49c283a 100644 --- a/src/cache/WorkerPool.go +++ b/src/cache/WorkerPool.go @@ -2,6 +2,8 @@ package cache import "sync" +var WorkerSemaphore chan struct{} + type WorkerPool struct { pool chan *Worker wg sync.WaitGroup @@ -9,7 +11,7 @@ type WorkerPool struct { func NewWorkerPool() *WorkerPool { return &WorkerPool{ - pool: make(chan *Worker, MaxWorkers), + pool: make(chan *Worker, cap(WorkerSemaphore)), // use the capacity of the semaphore as the size of the pool } } @@ -18,6 +20,8 @@ func (p *WorkerPool) Get() *Worker { case w := <-p.pool: return w default: + // Acquire a token from the semaphore + WorkerSemaphore <- struct{}{} return newWorker(len(p.pool)) } } @@ -26,7 +30,8 @@ func (p *WorkerPool) Put(w *Worker) { select { case p.pool <- w: default: - // If the pool is full, discard the worker + // If the pool is full, discard the worker and release the token back to the semaphore + <-WorkerSemaphore } } diff --git a/src/cache/crawler.go b/src/cache/crawler.go index 8ea7f26..77a31fb 100644 --- a/src/cache/crawler.go +++ b/src/cache/crawler.go @@ -10,6 +10,12 @@ import ( "time" ) +var itemPool = &sync.Pool{ + New: func() interface{} { + return &data.Item{} + }, +} + func StartCrawler(basePath string, sharedCache *lru.Cache[string, *data.Item], cfg *config.Config) error { log = logging.GetLogger() var wg sync.WaitGroup diff --git a/src/config/config.go b/src/config/config.go index 50bdde8..1d2bf0c 100644 --- a/src/config/config.go +++ b/src/config/config.go @@ -12,7 +12,6 @@ type Config struct { WatchMode string CrawlModeCrawlInterval int DirectoryCrawlers int - CrawlWorkers int WatchInterval int CacheSize int CacheTime int @@ -30,7 +29,7 @@ type Config struct { ApiSearchMaxResults int ApiSearchShowChildren bool CrawlerChannelBufferSize int - CrawlerWorkerPoolSize int + CrawlerMaxWorkers int } func LoadConfig(configFile string) (*Config, error) { @@ -40,7 +39,6 @@ func LoadConfig(configFile string) (*Config, error) { viper.SetDefault("watch_mode", "crawl") viper.SetDefault("crawl_mode_crawl_interval", 3600) viper.SetDefault("directory_crawlers", 4) - viper.SetDefault("crawl_workers", 10) viper.SetDefault("cache_size", 100000000) viper.SetDefault("cache_time", 30) viper.SetDefault("cache_print_new", false) @@ -56,7 +54,7 @@ func LoadConfig(configFile string) (*Config, error) { viper.SetDefault("api_search_show_children", false) viper.SetDefault("http_allow_during_initial_crawl", false) viper.SetDefault("crawler_channel_buffer_size", 1000) - viper.SetDefault("crawler_worker_pool_size", 200) + viper.SetDefault("crawler_max_workers", 200) err := viper.ReadInConfig() if err != nil { @@ -77,7 +75,6 @@ func LoadConfig(configFile string) (*Config, error) { CrawlModeCrawlInterval: viper.GetInt("crawl_mode_crawl_interval"), WatchInterval: viper.GetInt("watch_interval"), DirectoryCrawlers: viper.GetInt("crawl_mode_crawl_interval"), - CrawlWorkers: viper.GetInt("crawl_workers"), CacheSize: viper.GetInt("cache_size"), CacheTime: viper.GetInt("cache_time"), CachePrintNew: viper.GetBool("cache_print_new"), @@ -94,7 +91,7 @@ func LoadConfig(configFile string) (*Config, error) { ApiSearchMaxResults: viper.GetInt("api_search_max_results"), ApiSearchShowChildren: viper.GetBool("api_search_show_children"), CrawlerChannelBufferSize: viper.GetInt("crawler_channel_buffer_size"), - CrawlerWorkerPoolSize: viper.GetInt("crawler_worker_pool_size"), + CrawlerMaxWorkers: viper.GetInt("crawler_worker_pool_size"), } if config.WatchMode != "crawl" && config.WatchMode != "watch" { @@ -109,12 +106,8 @@ func LoadConfig(configFile string) (*Config, error) { return nil, errors.New("crawl_mode_crawl_interval must be more than 1") } - if config.CrawlWorkers < 1 { - return nil, errors.New("crawl_workers must be more than 1") - } - if config.CacheSize < 1 { - return nil, errors.New("crawl_workers must be more than 1") + return nil, errors.New("cache_size must be more than 1") } if config.CacheRecacheCrawlerLimit < 1 { diff --git a/src/crazyfs.go b/src/crazyfs.go index 4b9b968..21fd6ea 100644 --- a/src/crazyfs.go +++ b/src/crazyfs.go @@ -81,11 +81,13 @@ func main() { log.Fatalf("Failed to load config file: %s", err) } + // Set global constants cache.WorkerBufferSize = cfg.CrawlerChannelBufferSize cache.PrintNew = cfg.CachePrintNew cache.RootDir = cfg.RootDir cache.CrawlerParseMIME = cfg.CrawlerParseMIME - cache.MaxWorkers = cfg.CrawlWorkers + //cache.MaxWorkers = cfg.CrawlWorkers + cache.WorkerSemaphore = make(chan struct{}, cfg.CrawlerMaxWorkers) sharedCache, err := lru.New[string, *data.Item](cfg.CacheSize) if err != nil {