make workers global, fix worker setup, clean up

This commit is contained in:
Cyberes 2023-12-11 18:50:30 -07:00
parent 7078712bc3
commit 157f80a463
8 changed files with 96 additions and 107 deletions

View File

@ -33,7 +33,7 @@ func NewItem(fullPath string, info os.FileInfo) *Item {
// Ignore symlinks
return nil
} else {
log.Warnf("NewItem - Path does not exist: %s", fullPath)
log.Warnf("NewItem - StartPath does not exist: %s", fullPath)
return nil
}
}
@ -65,7 +65,7 @@ func NewItem(fullPath string, info os.FileInfo) *Item {
}
if os.IsNotExist(err) {
log.Warnf("Path does not exist: %s", fullPath)
log.Warnf("StartPath does not exist: %s", fullPath)
return nil
} else if err != nil {
log.Warnf("Error detecting MIME type of file %s - %v", fullPath, err)

View File

@ -3,7 +3,6 @@ package api
import (
"crazyfs/CacheItem"
"crazyfs/api/helpers"
"crazyfs/cache/DirectoryCrawler"
"crazyfs/config"
"crazyfs/elastic"
"crypto/sha256"
@ -30,12 +29,11 @@ func AdminCacheInfo(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cac
cacheLen := sharedCache.Len()
response := map[string]interface{}{
"cache_size": cacheLen,
"cache_max": config.GetConfig().CacheSize,
"crawls_running": DirectoryCrawler.GetTotalActiveCrawls(),
"busy_workers": DirectoryCrawler.BusyWorkers,
"new_sync_running": elastic.ElasticRefreshSyncRunning,
"refresh_sync_running": elastic.ElasticRefreshSyncRunning,
"cachedItems": cacheLen,
"cacheMax": config.GetConfig().CacheSize,
"recacheCrawlLimit": config.GetConfig().CacheRecacheCrawlerLimit,
"newSyncRunning": elastic.ElasticRefreshSyncRunning,
"refreshSyncRunning": elastic.ElasticRefreshSyncRunning,
}
w.Header().Set("Cache-Control", "no-store")
w.Header().Set("Content-Type", "application/json")

View File

@ -27,8 +27,14 @@ func AdminCrawlsInfo(w http.ResponseWriter, r *http.Request, sharedCache *lru.Ca
return
} else {
response := map[string]interface{}{
"crawls": map[string]interface{}{
"active": DirectoryCrawler.GetActiveCrawls(),
"finished": DirectoryCrawler.GetFinishedCrawls(),
},
"workers": map[string]interface{}{
"busy": DirectoryCrawler.BusyWorkers,
"max": config.GetConfig().DirectoryCrawlers,
},
}
w.Header().Set("Cache-Control", "no-store")
w.Header().Set("Content-Type", "application/json")

View File

@ -13,6 +13,9 @@ import (
// WorkerPool is a buffered channel acting as a semaphore to limit the number of active workers globally
var WorkerPool chan struct{}
// Jobs is a global channel that all Walker instances submit jobs to
var Jobs chan WalkJob
// BusyWorkers is an atomic counter for the number of active workers
var BusyWorkers int32
@ -20,15 +23,53 @@ var BusyWorkers int32
// to a walker function, does not point to a directory
var ErrNotDir = errors.New("not a directory")
// WalkJob is a job that's passed to the workers.
type WalkJob struct {
StartPath string
Walker *Walker
}
// Walker is constructed for each Walk() function invocation
type Walker struct {
wg sync.WaitGroup
jobs chan string
root string
followSymlinks bool
walkFunc filepath.WalkFunc
}
// InitializeWorkers starts the number of workers defined by config.GetConfig().DirectoryCrawlers
func InitializeWorkers() {
WorkerPool = make(chan struct{}, config.GetConfig().DirectoryCrawlers)
Jobs = make(chan WalkJob, config.GetConfig().CacheSize)
for n := 1; n <= config.GetConfig().DirectoryCrawlers; n++ {
go worker()
}
log.Debugf("Started %d directory crawler workers.", config.GetConfig().DirectoryCrawlers)
}
// worker processes all the jobs until the jobs channel is explicitly closed
func worker() {
for job := range Jobs {
WorkerPool <- struct{}{} // acquire a worker
atomic.AddInt32(&BusyWorkers, 1) // increment the number of active workers
err := job.Walker.processPath(job.StartPath)
if err != nil {
log.Warnf("worker - %s - %s", job.StartPath, err)
}
job.Walker.wg.Done() // decrement the WaitGroup counter
<-WorkerPool // release the worker when done
atomic.AddInt32(&BusyWorkers, -1) // decrement the number of active workers
}
}
// addJob increments the job counter
// and pushes the path to the jobs channel
func (w *Walker) addJob(job WalkJob) {
w.wg.Add(1)
Jobs <- job
}
// the readDirNames function below was taken from the original
// implementation (see https://golang.org/src/path/filepath/path.go)
// but has sorting removed (sorting doesn't make sense
@ -57,8 +98,8 @@ func readDirNames(dirname string) ([]string, error) {
// lstat is a wrapper for os.Lstat which accepts a path
// relative to Walker.root and also follows symlinks
func (w *Walker) lstat(relpath string) (info os.FileInfo, err error) {
path := filepath.Join(w.root, relpath)
func (w *Walker) lstat(relPath string) (info os.FileInfo, err error) {
path := filepath.Join(w.root, relPath)
info, err = os.Lstat(path)
if err != nil {
return nil, err
@ -81,82 +122,51 @@ func (w *Walker) lstat(relpath string) (info os.FileInfo, err error) {
// processPath processes one directory and adds
// its subdirectories to the queue for further processing
func (w *Walker) processPath(relpath string) error {
defer w.wg.Done()
path := filepath.Join(w.root, relpath)
names, err := readDirNames(path)
func (w *Walker) processPath(relPath string) error {
fullPath := filepath.Join(w.root, relPath)
names, err := readDirNames(fullPath)
if err != nil {
log.Errorf("Walker - processPath - readDirNames - %s", err)
return err
}
for _, name := range names {
subpath := filepath.Join(relpath, name)
info, err := w.lstat(subpath)
subPath := filepath.Join(relPath, name)
info, err := w.lstat(subPath)
if err != nil {
log.Warnf("processPath - %s - %s", relpath, err)
log.Warnf("processPath - %s - %s", relPath, err)
continue
}
if info == nil {
log.Warnf("processPath - %s - %s", relpath, err)
log.Warnf("processPath - %s - %s", relPath, err)
continue
}
err = w.walkFunc(filepath.Join(w.root, subpath), info, err)
subPathFull := filepath.Join(w.root, subPath)
err = w.walkFunc(subPathFull, info, err)
if errors.Is(err, filepath.SkipDir) {
return nil
}
if info.Mode().IsDir() {
w.addJob(subpath)
w.addJob(WalkJob{
StartPath: subPath,
Walker: w,
})
}
}
return nil
}
// addJob increments the job counter
// and pushes the path to the jobs channel
func (w *Walker) addJob(path string) {
w.wg.Add(1)
select {
// try to push the job to the channel
case w.jobs <- path: // ok
default: // buffer overflow
// process job synchronously
err := w.processPath(path)
if err != nil {
log.Warnf("addJob - %s - %s", path, err)
}
}
}
// worker processes all the jobs until the jobs channel is explicitly closed
func (w *Walker) worker() {
for path := range w.jobs {
WorkerPool <- struct{}{} // acquire a worker
atomic.AddInt32(&BusyWorkers, 1) // increment the number of active workers
err := w.processPath(path)
if err != nil {
log.Warnf("worker - %s", err)
}
<-WorkerPool // release the worker when done
atomic.AddInt32(&BusyWorkers, -1) // decrement the number of active workers
}
}
// Walk recursively descends into subdirectories, calling walkFn for each file or directory
// in the tree, including the root directory.
func (w *Walker) Walk(relpath string, walkFn filepath.WalkFunc) error {
w.jobs = make(chan string, config.GetConfig().DirectoryCrawlers)
func (w *Walker) Walk(relPath string, walkFn filepath.WalkFunc) error {
w.walkFunc = walkFn
info, err := w.lstat(relpath)
err = w.walkFunc(filepath.Join(w.root, relpath), info, err)
fullPath := filepath.Join(w.root, relPath)
info, err := w.lstat(relPath)
err = w.walkFunc(fullPath, info, err)
if errors.Is(err, filepath.SkipDir) {
return nil
}
@ -165,21 +175,18 @@ func (w *Walker) Walk(relpath string, walkFn filepath.WalkFunc) error {
}
if info == nil {
return fmt.Errorf("broken symlink: %s", relpath)
return fmt.Errorf("broken symlink: %s", relPath)
}
if !info.Mode().IsDir() {
return ErrNotDir
}
// Spawn workers
for n := 1; n <= config.GetConfig().DirectoryCrawlers; n++ {
go w.worker()
}
w.addJob(relpath) // add this path as a first job
w.addJob(WalkJob{
StartPath: relPath,
Walker: w,
}) // add this path as a first job
w.wg.Wait() // wait till all paths are processed
close(w.jobs) // signal workers to close
return nil
}

View File

@ -49,7 +49,7 @@ func (dc *DirectoryCrawler) processPath(fullPath string, info os.FileInfo) error
}
}
} else {
// Path is a file
// StartPath is a file
dc.AddCacheItem(fullPath, info)
}
return nil

View File

@ -59,8 +59,7 @@ func startCrawl(sharedCache *lru.Cache[string, *CacheItem.Item], wg *sync.WaitGr
func logCacheStatus(msg string, ticker *time.Ticker, sharedCache *lru.Cache[string, *CacheItem.Item], logFn func(format string, args ...interface{})) {
defer ticker.Stop()
for range ticker.C {
activeWorkers := int(DirectoryCrawler.BusyWorkers)
runningCrawls := DirectoryCrawler.GetTotalActiveCrawls()
logFn("%s - %d/%d items in the cache. Active workers: %d Active crawls: %d", msg, len(sharedCache.Keys()), config.GetConfig().CacheSize, activeWorkers, runningCrawls)
logFn("%s - %d/%d items in the cache. Busy workers: %d, running crawls: %d",
msg, len(sharedCache.Keys()), config.GetConfig().CacheSize, DirectoryCrawler.BusyWorkers, DirectoryCrawler.GetTotalActiveCrawls())
}
}

View File

@ -14,11 +14,9 @@ type Config struct {
HTTPPort string
CrawlModeCrawlInterval int
DirectoryCrawlers int
CrawlWorkers int
CacheSize int
CacheTime int
CachePrintNew bool
CachePrintChanges bool
InitialCrawl bool
CacheRecacheCrawlerLimit int
CrawlerParseMIME bool
@ -31,7 +29,6 @@ type Config struct {
RestrictedDownloadPaths []string
ApiSearchMaxResults int
ApiSearchShowChildren bool
WorkersJobQueueSize int
ElasticsearchEnable bool
ElasticsearchEndpoint string
ElasticsearchSyncEnable bool
@ -59,8 +56,7 @@ func SetConfig(configFile string) (*Config, error) {
viper.SetDefault("watch_interval", 1)
viper.SetDefault("watch_mode", "crawl")
viper.SetDefault("crawl_mode_crawl_interval", 3600)
viper.SetDefault("directory_crawlers", 4)
viper.SetDefault("crawl_workers", 10)
viper.SetDefault("directory_crawlers", 10)
viper.SetDefault("cache_size", 100000000)
viper.SetDefault("cache_time", 30)
viper.SetDefault("cache_print_new", false)
@ -110,24 +106,22 @@ func SetConfig(configFile string) (*Config, error) {
rootDir = "/"
}
workersJobQueueSizeValue := viper.GetInt("crawler_worker_job_queue_size")
var workersJobQueueSize int
if workersJobQueueSizeValue == 0 {
workersJobQueueSize = viper.GetInt("crawl_workers") * 100
} else {
workersJobQueueSize = workersJobQueueSizeValue
}
//workersJobQueueSizeValue := viper.GetInt("crawler_worker_job_queue_size")
//var workersJobQueueSize int
//if workersJobQueueSizeValue == 0 {
// workersJobQueueSize = viper.GetInt("crawl_workers") * 100
//} else {
// workersJobQueueSize = workersJobQueueSizeValue
//}
config := &Config{
RootDir: rootDir,
HTTPPort: viper.GetString("http_port"),
CrawlModeCrawlInterval: viper.GetInt("crawl_mode_crawl_interval"),
DirectoryCrawlers: viper.GetInt("crawl_mode_crawl_interval"),
CrawlWorkers: viper.GetInt("crawl_workers"),
DirectoryCrawlers: viper.GetInt("directory_crawlers"),
CacheSize: viper.GetInt("cache_size"),
CacheTime: viper.GetInt("cache_time"),
CachePrintNew: viper.GetBool("cache_print_new"),
CachePrintChanges: viper.GetBool("cache_print_changes"),
InitialCrawl: viper.GetBool("initial_crawl"),
CacheRecacheCrawlerLimit: viper.GetInt("cache_recache_crawler_limit"),
CrawlerParseMIME: viper.GetBool("crawler_parse_mime"),
@ -140,7 +134,6 @@ func SetConfig(configFile string) (*Config, error) {
RestrictedDownloadPaths: restrictedPaths,
ApiSearchMaxResults: viper.GetInt("api_search_max_results"),
ApiSearchShowChildren: viper.GetBool("api_search_show_children"),
WorkersJobQueueSize: workersJobQueueSize,
ElasticsearchEnable: viper.GetBool("elasticsearch_enable"),
ElasticsearchEndpoint: viper.GetString("elasticsearch_endpoint"),
ElasticsearchSyncEnable: viper.GetBool("elasticsearch_sync_enable"),
@ -165,10 +158,6 @@ func SetConfig(configFile string) (*Config, error) {
return nil, errors.New("crawl_mode_crawl_interval must be more than 1")
}
if config.CrawlWorkers < 1 {
return nil, errors.New("crawl_workers must be more than 1")
}
if config.CacheSize < 1 {
return nil, errors.New("crawl_workers must be more than 1")
}

View File

@ -39,14 +39,6 @@ type cliConfig struct {
// TODO: admin api endpoint to get status and progress of the full refresh of elasticsearch
func main() {
//fullPath := "/srv/chub-archive"
//RootDir := "/srv/chub-archive"
//
//fmt.Println(strings.HasPrefix(fullPath, RootDir))
////fmt.Println(fullPath != RootDir)
//
//return
cliArgs := parseArgs()
if cliArgs.help {
flag.Usage()
@ -101,9 +93,7 @@ func main() {
log.Infof("Elasticsearch enabled: %t", cfg.ElasticsearchEnable)
// Init global variables
//DirectoryCrawler.CrawlWorkerPool = DirectoryCrawler.NewWorkerPool(config.MaxWorkers)
DirectoryCrawler.WorkerPool = make(chan struct{}, cfg.CrawlWorkers)
DirectoryCrawler.InitializeWorkers()
cache.InitRecacheSemaphore(cfg.CacheRecacheCrawlerLimit)
@ -166,7 +156,7 @@ func main() {
func parseArgs() cliConfig {
var cliArgs cliConfig
flag.StringVar(&cliArgs.configFile, "config", "", "Path to the config file")
flag.StringVar(&cliArgs.configFile, "config", "", "StartPath to the config file")
flag.BoolVar(&cliArgs.initialCrawl, "initial-crawl", false, "Do an initial crawl to fill the cache")
flag.BoolVar(&cliArgs.initialCrawl, "i", false, "Do an initial crawl to fill the cache")
flag.BoolVar(&cliArgs.debug, "d", false, "Enable debug mode")