fix memory usage related to the worker queue size, reorganize things

This commit is contained in:
Cyberes 2023-12-11 21:35:44 -07:00
parent b5327e0c67
commit 2579c76f04
12 changed files with 292 additions and 217 deletions

54
src/Workers/Queue.go Normal file
View File

@ -0,0 +1,54 @@
package Workers
import (
"sync"
)
// This is a queue implementation that doesn't rely on channels. This way of doing things should be more memory-efficient.
// WalkJob is an individual job passed to the Workers.
type WalkJob struct {
StartPath string
Walker *Walker // A pointer to the shared Walker object is passed as well.
}
// WalkJobQueue is the Queue that workers pull jobs from.
type WalkJobQueue struct {
jobs []WalkJob
mutex sync.Mutex
cond *sync.Cond
}
func NewJobQueue() *WalkJobQueue {
q := &WalkJobQueue{}
q.cond = sync.NewCond(&q.mutex)
return q
}
// AddJob adds a job to the queue and signals the workers so they know to pick it up.
func (q *WalkJobQueue) AddJob(job WalkJob) {
q.mutex.Lock()
q.jobs = append(q.jobs, job)
q.mutex.Unlock()
q.cond.Signal()
}
// GetJob is how a worker pulls a job from the queue.
func (q *WalkJobQueue) GetJob() WalkJob {
q.mutex.Lock()
defer q.mutex.Unlock()
for len(q.jobs) == 0 {
q.cond.Wait()
}
job := q.jobs[0]
q.jobs = q.jobs[1:]
return job
}
// GetQueueSize returns the size of the queue.
func (q *WalkJobQueue) GetQueueSize() int {
return len(q.jobs)
}

12
src/Workers/Walk.go Normal file
View File

@ -0,0 +1,12 @@
package Workers
import "path/filepath"
// Walk is a wrapper function for the Walker object that mimics the behavior of filepath.Walk, and doesn't follow symlinks.
func Walk(root string, followSymlinks bool, walkFn filepath.WalkFunc) error {
w := Walker{
root: root,
followSymlinks: followSymlinks,
}
return w.Walk("", walkFn)
}

103
src/Workers/Walker.go Normal file
View File

@ -0,0 +1,103 @@
package Workers
import (
"errors"
"fmt"
"path/filepath"
"sync"
)
// Walker.go is the implementation behind `Walk()`, which is a filesystem walk
// using workers that pull jobs from a queue.
// ErrNotDir indicates that the path, which is being passed to a walker function, does not point to a directory.
var ErrNotDir = errors.New("not a directory")
// Walker is constructed for each Walk() function invocation.
type Walker struct {
wg sync.WaitGroup // used to sync the progress of all workers.
root string
followSymlinks bool
walkFunc filepath.WalkFunc
}
// addJob increments the job counter and pushes the path to the job queue.
func (w *Walker) addJob(job WalkJob) {
w.wg.Add(1)
Queue.AddJob(job)
}
// processPath processes one path.
func (w *Walker) processPath(relPath string) error {
fullPath := filepath.Join(w.root, relPath)
names, err := readDirNames(fullPath)
if err != nil {
log.Errorf("Walker - processPath - readDirNames - %s", err)
return err
}
// Iterate over the path's children.
for _, name := range names {
subPath := filepath.Join(relPath, name)
info, err := w.lstat(subPath)
if err != nil {
log.Warnf("processPath - %s - %s", relPath, err)
continue
}
if info == nil {
log.Warnf("processPath - %s - %s", relPath, err)
continue
}
subPathFull := filepath.Join(w.root, subPath)
err = w.walkFunc(subPathFull, info, err)
if errors.Is(err, filepath.SkipDir) {
return nil
}
// If this child is a directory, add it to the queue then move on.
if info.Mode().IsDir() {
w.addJob(WalkJob{
StartPath: subPath,
Walker: w,
})
}
}
return nil
}
// Walk recursively descends into subdirectories, calling the user-defined walkFn for each file or directory
// in the tree, starting with the root directory. It is only called one place: `Walk()` in Walk.go
func (w *Walker) Walk(relPath string, walkFn filepath.WalkFunc) error {
w.walkFunc = walkFn
// Parse the beginning path.
fullPath := filepath.Join(w.root, relPath)
info, err := w.lstat(relPath)
err = w.walkFunc(fullPath, info, err)
if errors.Is(err, filepath.SkipDir) {
return nil
}
if err != nil {
return err
}
if info == nil {
return fmt.Errorf("broken symlink: %s", relPath)
}
if !info.Mode().IsDir() {
return ErrNotDir
}
// Let the workers handle everything else.
w.addJob(WalkJob{
StartPath: relPath,
Walker: w,
})
// Wait for the workers to finish reading the file system.
w.wg.Wait()
return nil
}

58
src/Workers/file.go Normal file
View File

@ -0,0 +1,58 @@
package Workers
import (
"os"
"path/filepath"
)
// file.go holds functions related to the filesystem.
// the readDirNames function below was taken from the original
// implementation (see https://golang.org/src/path/filepath/path.go)
// but has sorting removed (sorting doesn't make sense
// in concurrent execution, anyway)
// readDirNames reads the directory named by dirname and returns
// a list of directory entries.
func readDirNames(dirname string) ([]string, error) {
f, err := os.Open(dirname)
if err != nil {
return nil, err
}
defer func() {
cerr := f.Close()
if err == nil {
err = cerr
}
}()
names, err := f.Readdirnames(-1)
if err != nil {
return nil, err
}
return names, nil
}
// lstat is a wrapper for os.Lstat which accepts a path
// relative to Walker.root and also follows symlinks
func (w *Walker) lstat(relPath string) (info os.FileInfo, err error) {
path := filepath.Join(w.root, relPath)
info, err = os.Lstat(path)
if err != nil {
return nil, err
}
// check if this is a symlink
if w.followSymlinks {
if info.Mode()&os.ModeSymlink > 0 {
path, err = filepath.EvalSymlinks(path)
if err != nil {
return nil, err
}
info, err = os.Lstat(path)
if err != nil {
return nil, err
}
}
}
return
}

12
src/Workers/init.go Normal file
View File

@ -0,0 +1,12 @@
package Workers
import (
"crazyfs/logging"
"github.com/sirupsen/logrus"
)
var log *logrus.Logger
func init() {
log = logging.GetLogger()
}

40
src/Workers/worker.go Normal file
View File

@ -0,0 +1,40 @@
package Workers
import (
"crazyfs/config"
"sync/atomic"
)
// worker.go holds the worker function for `Walk()`.
// Queue is the global walk job queue.
var Queue *WalkJobQueue
// BusyWorkers is an atomic counter for the number of active Workers
var BusyWorkers int32
// InitializeWorkers starts the number of Workers defined by the config.
func InitializeWorkers() {
Queue = NewJobQueue()
for n := 1; n <= config.GetConfig().DirectoryCrawlers; n++ {
go worker()
}
log.Debugf("Started %d directory crawler Workers.", config.GetConfig().DirectoryCrawlers)
}
// worker processes jobs forever.
func worker() {
for {
// Get a job from the queue. This is thread-safe because `GetJob()` locks the queue while reading.
job := Queue.GetJob()
atomic.AddInt32(&BusyWorkers, 1) // increment the number of active Workers
err := job.Walker.processPath(job.StartPath)
if err != nil {
log.Warnf("Workers - %s - %s", job.StartPath, err)
}
job.Walker.wg.Done() // decrement the WaitGroup counter
atomic.AddInt32(&BusyWorkers, -1) // decrement the number of active Workers
}
}

View File

@ -2,6 +2,7 @@ package api
import (
"crazyfs/CacheItem"
"crazyfs/Workers"
"crazyfs/api/helpers"
"crazyfs/cache/DirectoryCrawler"
"crazyfs/config"
@ -32,8 +33,11 @@ func AdminCrawlsInfo(w http.ResponseWriter, r *http.Request, sharedCache *lru.Ca
"finished": DirectoryCrawler.GetFinishedCrawls(),
},
"workers": map[string]interface{}{
"busy": DirectoryCrawler.BusyWorkers,
"max": config.GetConfig().DirectoryCrawlers,
"busy": Workers.BusyWorkers,
"alive": config.GetConfig().DirectoryCrawlers,
},
"queue": map[string]interface{}{
"size": Workers.Queue.GetQueueSize(),
},
}
w.Header().Set("Cache-Control", "no-store")

View File

@ -1,202 +0,0 @@
package DirectoryCrawler
import (
"crazyfs/config"
"errors"
"fmt"
"os"
"path/filepath"
"sync"
"sync/atomic"
)
// WorkerPool is a buffered channel acting as a semaphore to limit the number of active workers globally
var WorkerPool chan struct{}
// Jobs is a global channel that all Walker instances submit jobs to
var Jobs chan WalkJob
// BusyWorkers is an atomic counter for the number of active workers
var BusyWorkers int32
// ErrNotDir indicates that the path, which is being passed
// to a walker function, does not point to a directory
var ErrNotDir = errors.New("not a directory")
// WalkJob is a job that's passed to the workers.
type WalkJob struct {
StartPath string
Walker *Walker
}
// Walker is constructed for each Walk() function invocation
type Walker struct {
wg sync.WaitGroup
root string
followSymlinks bool
walkFunc filepath.WalkFunc
}
// InitializeWorkers starts the number of workers defined by config.GetConfig().DirectoryCrawlers
func InitializeWorkers() {
WorkerPool = make(chan struct{}, config.GetConfig().DirectoryCrawlers)
Jobs = make(chan WalkJob, config.GetConfig().CacheSize)
for n := 1; n <= config.GetConfig().DirectoryCrawlers; n++ {
go worker()
}
log.Debugf("Started %d directory crawler workers.", config.GetConfig().DirectoryCrawlers)
}
// worker processes all the jobs until the jobs channel is explicitly closed
func worker() {
for job := range Jobs {
WorkerPool <- struct{}{} // acquire a worker
atomic.AddInt32(&BusyWorkers, 1) // increment the number of active workers
err := job.Walker.processPath(job.StartPath)
if err != nil {
log.Warnf("worker - %s - %s", job.StartPath, err)
}
job.Walker.wg.Done() // decrement the WaitGroup counter
<-WorkerPool // release the worker when done
atomic.AddInt32(&BusyWorkers, -1) // decrement the number of active workers
}
}
// addJob increments the job counter
// and pushes the path to the jobs channel
func (w *Walker) addJob(job WalkJob) {
w.wg.Add(1)
Jobs <- job
}
// the readDirNames function below was taken from the original
// implementation (see https://golang.org/src/path/filepath/path.go)
// but has sorting removed (sorting doesn't make sense
// in concurrent execution, anyway)
// readDirNames reads the directory named by dirname and returns
// a list of directory entries.
func readDirNames(dirname string) ([]string, error) {
f, err := os.Open(dirname)
if err != nil {
return nil, err
}
defer func() {
cerr := f.Close()
if err == nil {
err = cerr
}
}()
names, err := f.Readdirnames(-1)
if err != nil {
return nil, err
}
return names, nil
}
// lstat is a wrapper for os.Lstat which accepts a path
// relative to Walker.root and also follows symlinks
func (w *Walker) lstat(relPath string) (info os.FileInfo, err error) {
path := filepath.Join(w.root, relPath)
info, err = os.Lstat(path)
if err != nil {
return nil, err
}
// check if this is a symlink
if w.followSymlinks {
if info.Mode()&os.ModeSymlink > 0 {
path, err = filepath.EvalSymlinks(path)
if err != nil {
return nil, err
}
info, err = os.Lstat(path)
if err != nil {
return nil, err
}
}
}
return
}
// processPath processes one directory and adds
// its subdirectories to the queue for further processing
func (w *Walker) processPath(relPath string) error {
fullPath := filepath.Join(w.root, relPath)
names, err := readDirNames(fullPath)
if err != nil {
log.Errorf("Walker - processPath - readDirNames - %s", err)
return err
}
for _, name := range names {
subPath := filepath.Join(relPath, name)
info, err := w.lstat(subPath)
if err != nil {
log.Warnf("processPath - %s - %s", relPath, err)
continue
}
if info == nil {
log.Warnf("processPath - %s - %s", relPath, err)
continue
}
subPathFull := filepath.Join(w.root, subPath)
err = w.walkFunc(subPathFull, info, err)
if errors.Is(err, filepath.SkipDir) {
return nil
}
if info.Mode().IsDir() {
w.addJob(WalkJob{
StartPath: subPath,
Walker: w,
})
}
}
return nil
}
// Walk recursively descends into subdirectories, calling walkFn for each file or directory
// in the tree, including the root directory.
func (w *Walker) Walk(relPath string, walkFn filepath.WalkFunc) error {
w.walkFunc = walkFn
fullPath := filepath.Join(w.root, relPath)
info, err := w.lstat(relPath)
err = w.walkFunc(fullPath, info, err)
if errors.Is(err, filepath.SkipDir) {
return nil
}
if err != nil {
return err
}
if info == nil {
return fmt.Errorf("broken symlink: %s", relPath)
}
if !info.Mode().IsDir() {
return ErrNotDir
}
w.addJob(WalkJob{
StartPath: relPath,
Walker: w,
}) // add this path as a first job
w.wg.Wait() // wait till all paths are processed
return nil
}
// Walk is a wrapper function for the Walker object
// that mimics the behavior of filepath.Walk,
// and doesn't follow symlinks.
func Walk(root string, followSymlinks bool, walkFn filepath.WalkFunc) error {
w := Walker{
root: root,
followSymlinks: followSymlinks,
}
return w.Walk("", walkFn)
}

View File

@ -2,6 +2,7 @@ package DirectoryCrawler
import (
"crazyfs/CacheItem"
"crazyfs/Workers"
"crazyfs/config"
"crazyfs/file"
"errors"
@ -78,7 +79,7 @@ func (dc *DirectoryCrawler) Crawl(fullPath string) error {
}
// If the path is a directory, start a walk
err := Walk(fullPath, config.FollowSymlinks, dc.walkRecursiveFunc)
err := Workers.Walk(fullPath, config.FollowSymlinks, dc.walkRecursiveFunc)
if err != nil {
log.Errorf("CRAWLER - crawl for %s failed: %s", fullPath, err)
}

View File

@ -2,6 +2,7 @@ package cache
import (
"crazyfs/CacheItem"
"crazyfs/Workers"
"crazyfs/cache/DirectoryCrawler"
"crazyfs/config"
"crazyfs/logging"
@ -59,7 +60,7 @@ func startCrawl(sharedCache *lru.Cache[string, *CacheItem.Item], wg *sync.WaitGr
func logCacheStatus(msg string, ticker *time.Ticker, sharedCache *lru.Cache[string, *CacheItem.Item], logFn func(format string, args ...interface{})) {
defer ticker.Stop()
for range ticker.C {
logFn("%s - %d/%d items in the cache. Busy workers: %d, running crawls: %d",
msg, len(sharedCache.Keys()), config.GetConfig().CacheSize, DirectoryCrawler.BusyWorkers, DirectoryCrawler.GetTotalActiveCrawls())
logFn("%s - %d/%d items in the cache. Busy Workers: %d. Jobs remaining: %d. Running crawls: %d",
msg, len(sharedCache.Keys()), config.GetConfig().CacheSize, Workers.BusyWorkers, Workers.Queue.GetQueueSize(), DirectoryCrawler.GetTotalActiveCrawls())
}
}

View File

@ -106,14 +106,6 @@ func SetConfig(configFile string) (*Config, error) {
rootDir = "/"
}
//workersJobQueueSizeValue := viper.GetInt("crawler_worker_job_queue_size")
//var workersJobQueueSize int
//if workersJobQueueSizeValue == 0 {
// workersJobQueueSize = viper.GetInt("crawl_workers") * 100
//} else {
// workersJobQueueSize = workersJobQueueSizeValue
//}
config := &Config{
RootDir: rootDir,
HTTPPort: viper.GetString("http_port"),

View File

@ -2,9 +2,9 @@ package main
import (
"crazyfs/CacheItem"
"crazyfs/Workers"
"crazyfs/api"
"crazyfs/cache"
"crazyfs/cache/DirectoryCrawler"
"crazyfs/config"
"crazyfs/elastic"
"crazyfs/logging"
@ -93,7 +93,7 @@ func main() {
log.Infof("Elasticsearch enabled: %t", cfg.ElasticsearchEnable)
DirectoryCrawler.InitializeWorkers()
Workers.InitializeWorkers()
cache.InitRecacheSemaphore(cfg.CacheRecacheCrawlerLimit)