refactor and performance improvements
This commit is contained in:
parent
f40907dd8a
commit
4e9d3265fd
|
@ -1,7 +1,6 @@
|
|||
package api
|
||||
|
||||
import (
|
||||
"crazyfs/cache"
|
||||
"crazyfs/config"
|
||||
"crazyfs/data"
|
||||
"encoding/json"
|
||||
|
@ -30,27 +29,10 @@ func AdminCacheInfo(w http.ResponseWriter, r *http.Request, cfg *config.Config,
|
|||
cacheKeys = []string{}
|
||||
}
|
||||
|
||||
// Get the running scans and workers
|
||||
//runningScans := cache.GetRunningScans()
|
||||
runningWorkers := cache.GetRunningWorkers()
|
||||
|
||||
// Get the active scans
|
||||
activeScans := cache.GetActiveScans()
|
||||
|
||||
// Convert the active scans map to a list
|
||||
activeScanList := make([]string, 0, len(activeScans))
|
||||
for scan := range activeScans {
|
||||
if activeScans[scan] {
|
||||
activeScanList = append(activeScanList, scan)
|
||||
}
|
||||
}
|
||||
|
||||
response := map[string]interface{}{
|
||||
"cache_size": cacheLen,
|
||||
"cache_keys": cacheKeys,
|
||||
"cache_max": cfg.CacheSize,
|
||||
"running_workers": runningWorkers,
|
||||
"running_scans": activeScanList,
|
||||
"cache_size": cacheLen,
|
||||
"cache_keys": cacheKeys,
|
||||
"cache_max": cfg.CacheSize,
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
|
|
|
@ -16,7 +16,7 @@ func HealthCheck(w http.ResponseWriter, r *http.Request, cfg *config.Config, sha
|
|||
|
||||
response := map[string]interface{}{}
|
||||
|
||||
response["scan_running"] = cache.GetRunningScans() > 0
|
||||
//response["scan_running"] = cache.GetRunningScans() > 0
|
||||
response["initial_scan_running"] = cache.InitialCrawlInProgress
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
|
|
|
@ -76,7 +76,7 @@ func Download(w http.ResponseWriter, r *http.Request, cfg *config.Config, shared
|
|||
var mimeType string
|
||||
var err error
|
||||
if item.Type == nil {
|
||||
fileExists, mimeType, _, err = cache.GetMimeType(fullPath, true)
|
||||
fileExists, mimeType, _, err = cache.GetFileMime(fullPath, true)
|
||||
if !fileExists {
|
||||
helpers.Return400Msg("file not found", w)
|
||||
}
|
||||
|
@ -90,7 +90,7 @@ func Download(w http.ResponseWriter, r *http.Request, cfg *config.Config, shared
|
|||
})
|
||||
return
|
||||
}
|
||||
// GetMimeType() returns an empty string if it was a directory
|
||||
// GetFileMime() returns an empty string if it was a directory
|
||||
if mimeType != "" {
|
||||
// Update the item's MIME in the sharedCache
|
||||
item.Type = &mimeType
|
||||
|
|
|
@ -16,9 +16,10 @@ func HandleFileNotFound(relPath string, fullPath string, sharedCache *lru.Cache[
|
|||
log := logging.GetLogger()
|
||||
// If the data is not in the cache, start a new crawler
|
||||
log.Debugf("CRAWLER - %s not in cache, crawling", fullPath)
|
||||
dc := cache.NewDirectoryCrawler(sharedCache)
|
||||
pool := cache.NewWorkerPool()
|
||||
crawler := cache.NewDirectoryCrawler(sharedCache, pool)
|
||||
// We don't want to traverse the entire directory tree since we'll only return the current directory anyways
|
||||
err := dc.CrawlNoRecursion(fullPath, cfg.CachePrintNew, cfg.CrawlWorkers, cfg.RootDir, cfg.CrawlerParseMIME)
|
||||
err := crawler.Crawl(fullPath, false)
|
||||
if err != nil {
|
||||
log.Errorf("LIST - crawl failed: %s", err)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
|
|
|
@ -60,9 +60,10 @@ func ListDir(w http.ResponseWriter, r *http.Request, cfg *config.Config, sharedC
|
|||
// Start a recursive crawl in the background.
|
||||
// We've already gotten our cached item (may be null if it doesn't exist) so this won't affect our results
|
||||
go func() {
|
||||
log.Debugf("Starting background recursive crawl for %s", fullPath)
|
||||
dc := cache.NewDirectoryCrawler(sharedCache)
|
||||
err := dc.Crawl(fullPath, cfg.CachePrintNew, cfg.CrawlWorkers, cfg.RootDir, cfg.CrawlerParseMIME)
|
||||
log.Debugf("LIST - starting background recursive crawl for %s", fullPath)
|
||||
pool := cache.NewWorkerPool()
|
||||
crawler := cache.NewDirectoryCrawler(sharedCache, pool)
|
||||
err := crawler.Crawl(fullPath, true)
|
||||
if err != nil {
|
||||
log.Errorf("LIST - background recursive crawl failed: %s", err)
|
||||
}
|
||||
|
@ -99,11 +100,11 @@ func ListDir(w http.ResponseWriter, r *http.Request, cfg *config.Config, sharedC
|
|||
"error": "not allowed to analyze the mime of directories",
|
||||
})
|
||||
return
|
||||
} else {
|
||||
} else if !item.IsDir {
|
||||
// Only update the mime in the cache if it hasn't been set already.
|
||||
// TODO: need to make sure that when a re-crawl is triggered, the Type is set back to nil
|
||||
if item.Type == nil {
|
||||
fileExists, mimeType, ext, err := cache.GetMimeType(fullPath, true)
|
||||
fileExists, mimeType, ext, err := cache.GetFileMime(fullPath, true)
|
||||
if !fileExists {
|
||||
helpers.Return400Msg("file not found", w)
|
||||
}
|
||||
|
|
|
@ -96,7 +96,7 @@ func Thumbnail(w http.ResponseWriter, r *http.Request, cfg *config.Config, share
|
|||
}
|
||||
|
||||
// Get the MIME type of the file
|
||||
fileExists, mimeType, ext, err := cache.GetMimeType(fullPath, true)
|
||||
fileExists, mimeType, ext, err := cache.GetFileMime(fullPath, true)
|
||||
if !fileExists {
|
||||
helpers.Return400Msg("file not found", w)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,147 @@
|
|||
package cache
|
||||
|
||||
import (
|
||||
"crazyfs/data"
|
||||
lru "github.com/hashicorp/golang-lru/v2"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// Config values
|
||||
var FollowSymlinks bool
|
||||
var WorkerBufferSize int
|
||||
var PrintNew bool
|
||||
var RootDir string
|
||||
var CrawlerParseMIME bool
|
||||
var MaxWorkers int
|
||||
|
||||
var itemPool = &sync.Pool{
|
||||
New: func() interface{} {
|
||||
return &data.Item{}
|
||||
},
|
||||
}
|
||||
|
||||
type DirectoryCrawler struct {
|
||||
cache *lru.Cache[string, *data.Item]
|
||||
pool *WorkerPool
|
||||
}
|
||||
|
||||
func NewDirectoryCrawler(cache *lru.Cache[string, *data.Item], pool *WorkerPool) *DirectoryCrawler {
|
||||
return &DirectoryCrawler{cache: cache, pool: pool}
|
||||
}
|
||||
|
||||
func (dc *DirectoryCrawler) Crawl(path string, recursive bool) error {
|
||||
info, err := os.Stat(path)
|
||||
if os.IsNotExist(err) {
|
||||
// If the path doesn't exist, just silently exit
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Get a list of all keys in the cache that belong to this directory
|
||||
keys := make([]string, 0)
|
||||
for _, key := range dc.cache.Keys() {
|
||||
if strings.HasPrefix(key, path) {
|
||||
keys = append(keys, key)
|
||||
}
|
||||
}
|
||||
|
||||
// Remove all entries in the cache that belong to this directory so we can start fresh
|
||||
for _, key := range keys {
|
||||
dc.cache.Remove(key)
|
||||
}
|
||||
|
||||
if info.IsDir() {
|
||||
// If the path is a directory, walk the directory
|
||||
var wg sync.WaitGroup
|
||||
err := dc.walkDir(path, &wg, info, recursive)
|
||||
if err != nil {
|
||||
log.Errorf("CRAWLER - dc.walkDir() in Crawl() returned error: %s", err)
|
||||
}
|
||||
} else {
|
||||
// If the path is a file, add it to the cache directly
|
||||
dc.cache.Add(StripRootDir(path, RootDir), NewItem(path, info))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dc *DirectoryCrawler) walkDir(dir string, n *sync.WaitGroup, dirInfo os.FileInfo, recursive bool) error {
|
||||
// We are handling errors for each file or directory individually. Does this slow things down?
|
||||
|
||||
entries, err := os.ReadDir(dir)
|
||||
if err != nil {
|
||||
log.Errorf("CRAWLER - walkDir() failed to read directory %s: %s", dir, err)
|
||||
return err
|
||||
}
|
||||
|
||||
// Create the directory item but don't add it to the cache yet
|
||||
dirItem := NewItem(dir, dirInfo)
|
||||
|
||||
for _, entry := range entries {
|
||||
subpath := filepath.Join(dir, entry.Name())
|
||||
info, err := os.Lstat(subpath) // update the info var with the new entry
|
||||
if err != nil {
|
||||
log.Warnf("CRAWLER - walkDir() failed to stat subpath %s: %s", subpath, err)
|
||||
continue
|
||||
}
|
||||
if FollowSymlinks && info.Mode()&os.ModeSymlink != 0 {
|
||||
link, err := os.Readlink(subpath)
|
||||
if err != nil {
|
||||
log.Warnf("CRAWLER - walkDir() failed to read symlink %s: %s", subpath, err)
|
||||
continue
|
||||
}
|
||||
info, err = os.Stat(link)
|
||||
if err != nil {
|
||||
log.Warnf("CRAWLER - walkDir() failed to stat link %s: %s", link, err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
if entry.IsDir() && recursive {
|
||||
n.Add(1)
|
||||
go func() {
|
||||
defer n.Done() // Move Done() here
|
||||
err := dc.walkDir(subpath, n, info, recursive)
|
||||
if err != nil {
|
||||
log.Errorf("CRAWLER - dc.walkDir() in walkDir() -> IsDir() returned error: %s", err)
|
||||
}
|
||||
}()
|
||||
} else {
|
||||
w := dc.pool.Get()
|
||||
w.add(subpath)
|
||||
dc.pool.Put(w)
|
||||
}
|
||||
|
||||
// Add the entry to the directory's contents
|
||||
entryItem := NewItem(subpath, info)
|
||||
dirItem.Children = append(dirItem.Children, entryItem)
|
||||
}
|
||||
|
||||
// Add the directory to the cache after all of its children have been processed
|
||||
dc.cache.Add(StripRootDir(dir, RootDir), dirItem)
|
||||
|
||||
// If the directory is not the root directory, update the parent directory's Children field
|
||||
if dir != RootDir {
|
||||
parentDir := filepath.Dir(dir)
|
||||
parentItem, found := dc.cache.Get(StripRootDir(parentDir, RootDir))
|
||||
if found {
|
||||
// Remove the old version of the directory from the parent's Children field
|
||||
for i, child := range parentItem.Children {
|
||||
if child.Path == StripRootDir(dir, RootDir) {
|
||||
parentItem.Children = append(parentItem.Children[:i], parentItem.Children[i+1:]...)
|
||||
break
|
||||
}
|
||||
}
|
||||
// Add the new version of the directory to the parent's Children field
|
||||
parentItem.Children = append(parentItem.Children, dirItem)
|
||||
// Update the parent directory in the cache
|
||||
dc.cache.Add(StripRootDir(parentDir, RootDir), parentItem)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,44 @@
|
|||
package cache
|
||||
|
||||
import (
|
||||
"crazyfs/logging"
|
||||
"os"
|
||||
)
|
||||
|
||||
type Worker struct {
|
||||
id int
|
||||
ch chan string
|
||||
active bool
|
||||
}
|
||||
|
||||
func newWorker(id int) *Worker {
|
||||
return &Worker{
|
||||
id: id,
|
||||
ch: make(chan string, WorkerBufferSize),
|
||||
active: false,
|
||||
}
|
||||
}
|
||||
|
||||
func (w *Worker) start(dc *DirectoryCrawler) {
|
||||
w.active = true
|
||||
go func() {
|
||||
for path := range w.ch {
|
||||
info, err := os.Stat(path)
|
||||
if err != nil {
|
||||
logger := logging.GetLogger()
|
||||
logger.Errorf("WORKER START - os.Stat() - %s", err)
|
||||
continue
|
||||
}
|
||||
dc.cache.Add(StripRootDir(path, RootDir), NewItem(path, info))
|
||||
}
|
||||
w.active = false
|
||||
}()
|
||||
}
|
||||
|
||||
func (w *Worker) add(path string) {
|
||||
w.ch <- path
|
||||
}
|
||||
|
||||
func (w *Worker) stop() {
|
||||
close(w.ch)
|
||||
}
|
|
@ -0,0 +1,39 @@
|
|||
package cache
|
||||
|
||||
import "sync"
|
||||
|
||||
type WorkerPool struct {
|
||||
pool chan *Worker
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
func NewWorkerPool() *WorkerPool {
|
||||
return &WorkerPool{
|
||||
pool: make(chan *Worker, MaxWorkers),
|
||||
}
|
||||
}
|
||||
|
||||
func (p *WorkerPool) Get() *Worker {
|
||||
select {
|
||||
case w := <-p.pool:
|
||||
return w
|
||||
default:
|
||||
return newWorker(len(p.pool))
|
||||
}
|
||||
}
|
||||
|
||||
func (p *WorkerPool) Put(w *Worker) {
|
||||
select {
|
||||
case p.pool <- w:
|
||||
default:
|
||||
// If the pool is full, discard the worker
|
||||
}
|
||||
}
|
||||
|
||||
func (p *WorkerPool) Wait() {
|
||||
p.wg.Wait()
|
||||
}
|
||||
|
||||
func (p *WorkerPool) Add(delta int) {
|
||||
p.wg.Add(delta)
|
||||
}
|
|
@ -1,423 +0,0 @@
|
|||
package cache
|
||||
|
||||
import (
|
||||
"crazyfs/data"
|
||||
"crazyfs/logging"
|
||||
"github.com/gabriel-vasile/mimetype"
|
||||
lru "github.com/hashicorp/golang-lru/v2"
|
||||
"mime"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
var FollowSymlinks bool
|
||||
|
||||
// Add global variables to keep track of running scans and workers
|
||||
var runningScans int32
|
||||
var runningWorkers int32
|
||||
|
||||
// Add maps to keep track of the current directory each worker is on and which directory scans are currently active
|
||||
var workerDirs map[int32]string
|
||||
var activeScans map[string]bool
|
||||
var mapMutex sync.Mutex
|
||||
|
||||
func init() {
|
||||
workerDirs = make(map[int32]string)
|
||||
activeScans = make(map[string]bool)
|
||||
}
|
||||
|
||||
func NewItem(path string, info os.FileInfo, CachePrintNew bool, RootDir string, CrawlerParseMIME bool) *data.Item {
|
||||
if CachePrintNew {
|
||||
log = logging.GetLogger()
|
||||
log.Debugf("CACHE - new: %s", path)
|
||||
}
|
||||
|
||||
var mimeType string
|
||||
var ext string
|
||||
if !info.IsDir() {
|
||||
if CrawlerParseMIME {
|
||||
if !info.IsDir() {
|
||||
ext = filepath.Ext(path)
|
||||
mimeObj, err := mimetype.DetectFile(path)
|
||||
if err != nil {
|
||||
log.Warnf("Error detecting MIME type: %v", err)
|
||||
} else {
|
||||
mimeType = mimeObj.String()
|
||||
}
|
||||
}
|
||||
} else {
|
||||
mimeType = mime.TypeByExtension(ext)
|
||||
}
|
||||
}
|
||||
ext = filepath.Ext(path)
|
||||
|
||||
if strings.Contains(mimeType, ";") {
|
||||
mimeType = strings.Split(mimeType, ";")[0]
|
||||
}
|
||||
|
||||
// Create pointers for mimeType and ext
|
||||
var mimeTypePtr, extPtr *string
|
||||
if mimeType != "" {
|
||||
mimeTypePtr = &mimeType
|
||||
}
|
||||
if ext != "" {
|
||||
extPtr = &ext
|
||||
}
|
||||
|
||||
return &data.Item{
|
||||
Path: StripRootDir(path, RootDir),
|
||||
Name: info.Name(),
|
||||
Size: info.Size(),
|
||||
Extension: extPtr,
|
||||
Modified: info.ModTime().UTC().Format(time.RFC3339Nano),
|
||||
Mode: uint32(info.Mode().Perm()),
|
||||
IsDir: info.IsDir(),
|
||||
IsSymlink: info.Mode()&os.ModeSymlink != 0,
|
||||
Cached: time.Now().UnixNano() / int64(time.Millisecond), // Set the created time to now in milliseconds
|
||||
Children: make([]*data.Item, 0),
|
||||
Type: mimeTypePtr,
|
||||
}
|
||||
}
|
||||
|
||||
type DirectoryCrawler struct {
|
||||
cache *lru.Cache[string, *data.Item]
|
||||
}
|
||||
|
||||
func NewDirectoryCrawler(cache *lru.Cache[string, *data.Item]) *DirectoryCrawler {
|
||||
return &DirectoryCrawler{cache: cache}
|
||||
}
|
||||
|
||||
type worker struct {
|
||||
wg *sync.WaitGroup
|
||||
addWg sync.WaitGroup
|
||||
ch chan string
|
||||
active bool
|
||||
id int32
|
||||
}
|
||||
|
||||
func newWorker() *worker {
|
||||
// Increment running workers count
|
||||
id := atomic.AddInt32(&runningWorkers, 1)
|
||||
return &worker{
|
||||
wg: new(sync.WaitGroup),
|
||||
ch: make(chan string),
|
||||
active: false,
|
||||
id: id,
|
||||
}
|
||||
}
|
||||
|
||||
func (w *worker) start(dc *DirectoryCrawler, CachePrintNew bool, RootDir string, CrawlerParseMIME bool) {
|
||||
w.active = true
|
||||
w.wg.Add(1)
|
||||
go func() {
|
||||
defer w.wg.Done()
|
||||
for path := range w.ch {
|
||||
w.addWg.Add(1)
|
||||
// Update the current directory of the worker
|
||||
mapMutex.Lock()
|
||||
workerDirs[w.id] = path
|
||||
mapMutex.Unlock()
|
||||
info, err := os.Stat(path)
|
||||
if err != nil {
|
||||
// handle error
|
||||
w.addWg.Done()
|
||||
continue
|
||||
}
|
||||
dc.cache.Add(StripRootDir(path, RootDir), NewItem(path, info, CachePrintNew, RootDir, CrawlerParseMIME))
|
||||
w.addWg.Done()
|
||||
}
|
||||
w.active = false
|
||||
// Decrement running workers count
|
||||
atomic.AddInt32(&runningWorkers, -1)
|
||||
}()
|
||||
}
|
||||
|
||||
func (w *worker) add(path string) {
|
||||
w.ch <- path
|
||||
}
|
||||
|
||||
func (w *worker) stop() {
|
||||
close(w.ch)
|
||||
w.addWg.Wait()
|
||||
w.wg.Wait()
|
||||
}
|
||||
|
||||
func (dc *DirectoryCrawler) Crawl(path string, CachePrintNew bool, numWorkers int, RootDir string, CrawlerParseMIME bool) error {
|
||||
if _, err := os.Stat(path); os.IsNotExist(err) {
|
||||
// If the path doesn't exist, just silently exit
|
||||
return nil
|
||||
}
|
||||
|
||||
info, err := os.Stat(path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Increment running scans count
|
||||
atomic.AddInt32(&runningScans, 1)
|
||||
// Mark the scan as active
|
||||
mapMutex.Lock()
|
||||
activeScans[path] = true
|
||||
mapMutex.Unlock()
|
||||
|
||||
// Get a list of all keys in the cache that belong to this directory
|
||||
keys := make([]string, 0)
|
||||
for _, key := range dc.cache.Keys() {
|
||||
if strings.HasPrefix(key, path) {
|
||||
keys = append(keys, key)
|
||||
}
|
||||
}
|
||||
|
||||
// Remove all entries in the cache that belong to this directory so we can start fresh
|
||||
for _, key := range dc.cache.Keys() {
|
||||
if strings.HasPrefix(key, path) {
|
||||
dc.cache.Remove(key)
|
||||
}
|
||||
}
|
||||
|
||||
if info.IsDir() {
|
||||
// If the path is a directory, start workers and walk the directory
|
||||
workers := make([]*worker, numWorkers)
|
||||
for i := 0; i < numWorkers; i++ {
|
||||
workers[i] = newWorker()
|
||||
workers[i].start(dc, CachePrintNew, RootDir, CrawlerParseMIME)
|
||||
}
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
err := dc.walkDir(path, &wg, workers, numWorkers, CachePrintNew, RootDir, CrawlerParseMIME)
|
||||
if err != nil {
|
||||
log.Errorf("CRAWLER - dc.walkDir() in Crawl() returned error: %s", err)
|
||||
}
|
||||
}()
|
||||
wg.Wait()
|
||||
for _, worker := range workers {
|
||||
worker.stop()
|
||||
}
|
||||
} else {
|
||||
// If the path is a file, add it to the cache directly
|
||||
dc.cache.Add(StripRootDir(path, RootDir), NewItem(path, info, CachePrintNew, RootDir, CrawlerParseMIME))
|
||||
}
|
||||
|
||||
// After crawling, remove any keys that are still in the list (these are items that were not found on the filesystem)
|
||||
for _, key := range keys {
|
||||
dc.cache.Remove(key)
|
||||
}
|
||||
|
||||
// Mark the scan as inactive
|
||||
mapMutex.Lock()
|
||||
activeScans[path] = false
|
||||
mapMutex.Unlock()
|
||||
// Decrement running scans count
|
||||
atomic.AddInt32(&runningScans, -1)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dc *DirectoryCrawler) walkDir(dir string, n *sync.WaitGroup, workers []*worker, numWorkers int, CachePrintNew bool, RootDir string, CrawlerParseMIME bool) error {
|
||||
defer n.Done()
|
||||
|
||||
entries, err := os.ReadDir(dir)
|
||||
if err != nil {
|
||||
log.Errorf("CRAWLER - walkDir() failed to read directory %s: %s", dir, err)
|
||||
return err
|
||||
}
|
||||
|
||||
// Create the directory item but don't add it to the cache yet
|
||||
info, err := os.Stat(dir)
|
||||
if err != nil {
|
||||
log.Errorf("CRAWLER - walkDir() failed to stat %s: %s", dir, err)
|
||||
return err
|
||||
}
|
||||
dirItem := NewItem(dir, info, CachePrintNew, RootDir, CrawlerParseMIME)
|
||||
|
||||
i := 0
|
||||
for _, entry := range entries {
|
||||
subpath := filepath.Join(dir, entry.Name())
|
||||
info, err = os.Lstat(subpath)
|
||||
if err != nil {
|
||||
log.Warnf("CRAWLER - walkDir() failed to stat subpath %s: %s", subpath, err)
|
||||
continue
|
||||
}
|
||||
if FollowSymlinks && info.Mode()&os.ModeSymlink != 0 {
|
||||
link, err := os.Readlink(subpath)
|
||||
if err != nil {
|
||||
log.Warnf("CRAWLER - walkDir() failed to read symlink %s: %s", subpath, err)
|
||||
continue
|
||||
}
|
||||
linkInfo, err := os.Stat(link)
|
||||
if err != nil {
|
||||
log.Warnf("CRAWLER - walkDir() failed to stat link %s: %s", link, err)
|
||||
continue
|
||||
}
|
||||
if linkInfo.IsDir() {
|
||||
n.Add(1)
|
||||
go func() {
|
||||
err := dc.walkDir(link, n, workers, numWorkers, CachePrintNew, RootDir, CrawlerParseMIME)
|
||||
if err != nil {
|
||||
log.Errorf("CRAWLER - dc.walkDir() in walkDir() -> follow symlinks returned error: %s", err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
} else if entry.IsDir() {
|
||||
n.Add(1)
|
||||
go func() {
|
||||
err := dc.walkDir(subpath, n, workers, numWorkers, CachePrintNew, RootDir, CrawlerParseMIME)
|
||||
if err != nil {
|
||||
log.Errorf("CRAWLER - dc.walkDir() in walkDir() -> IsDir() returned error: %s", err)
|
||||
}
|
||||
}()
|
||||
} else {
|
||||
workers[i%numWorkers].add(subpath)
|
||||
i++
|
||||
}
|
||||
|
||||
// Add the entry to the directory's contents
|
||||
entryItem := NewItem(subpath, info, CachePrintNew, RootDir, CrawlerParseMIME)
|
||||
dirItem.Children = append(dirItem.Children, entryItem)
|
||||
}
|
||||
|
||||
// Add the directory to the cache after all of its children have been processed
|
||||
dc.cache.Add(StripRootDir(dir, RootDir), dirItem)
|
||||
|
||||
// If the directory is not the root directory, update the parent directory's Children field
|
||||
if dir != RootDir {
|
||||
parentDir := filepath.Dir(dir)
|
||||
parentItem, found := dc.cache.Get(StripRootDir(parentDir, RootDir))
|
||||
if found {
|
||||
// Remove the old version of the directory from the parent's Children field
|
||||
for i, child := range parentItem.Children {
|
||||
if child.Path == StripRootDir(dir, RootDir) {
|
||||
parentItem.Children = append(parentItem.Children[:i], parentItem.Children[i+1:]...)
|
||||
break
|
||||
}
|
||||
}
|
||||
// Add the new version of the directory to the parent's Children field
|
||||
parentItem.Children = append(parentItem.Children, dirItem)
|
||||
// Update the parent directory in the cache
|
||||
dc.cache.Add(StripRootDir(parentDir, RootDir), parentItem)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dc *DirectoryCrawler) Get(path string) (*data.Item, bool) {
|
||||
return dc.cache.Get(path)
|
||||
}
|
||||
|
||||
func (dc *DirectoryCrawler) walkDirNoRecursion(dir string, n *sync.WaitGroup, workers []*worker, numWorkers int, CachePrintNew bool, RootDir string, CrawlerParseMIME bool) error {
|
||||
defer n.Done()
|
||||
|
||||
entries, err := os.ReadDir(dir)
|
||||
if err != nil {
|
||||
log.Errorf("CRAWLER - walkDir() failed to read directory %s: %s", dir, err)
|
||||
return err
|
||||
}
|
||||
|
||||
// Add the directory itself to the cache
|
||||
info, err := os.Stat(dir)
|
||||
if err != nil {
|
||||
log.Errorf("CRAWLER - walkDir() failed to stat %s: %s", dir, err)
|
||||
return err
|
||||
}
|
||||
dirItem := NewItem(dir, info, CachePrintNew, RootDir, CrawlerParseMIME)
|
||||
dc.cache.Add(StripRootDir(dir, RootDir), dirItem)
|
||||
|
||||
i := 0
|
||||
for _, entry := range entries {
|
||||
subpath := filepath.Join(dir, entry.Name())
|
||||
if !entry.IsDir() {
|
||||
workers[i%numWorkers].add(subpath)
|
||||
i++
|
||||
}
|
||||
|
||||
// Add the entry to the directory's contents
|
||||
info, err = os.Stat(subpath)
|
||||
if err != nil {
|
||||
log.Warnf("CRAWLER - walkDir() failed to stat subpath %s: %s", subpath, err)
|
||||
continue
|
||||
}
|
||||
entryItem := NewItem(subpath, info, CachePrintNew, RootDir, CrawlerParseMIME)
|
||||
dirItem.Children = append(dirItem.Children, entryItem)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dc *DirectoryCrawler) CrawlNoRecursion(path string, CachePrintNew bool, numWorkers int, RootDir string, CrawlerParseMIME bool) error {
|
||||
if _, err := os.Stat(path); os.IsNotExist(err) {
|
||||
// If the path doesn't exist, just silently exit
|
||||
return nil
|
||||
}
|
||||
|
||||
info, err := os.Stat(path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Increment running scans count
|
||||
atomic.AddInt32(&runningScans, 1)
|
||||
// Mark the scan as active
|
||||
mapMutex.Lock()
|
||||
activeScans[path] = true
|
||||
mapMutex.Unlock()
|
||||
|
||||
if info.IsDir() {
|
||||
// If the path is a directory, start workers and walk the directory
|
||||
workers := make([]*worker, numWorkers)
|
||||
for i := 0; i < numWorkers; i++ {
|
||||
workers[i] = newWorker()
|
||||
workers[i].start(dc, CachePrintNew, RootDir, CrawlerParseMIME)
|
||||
}
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
err := dc.walkDirNoRecursion(path, &wg, workers, numWorkers, CachePrintNew, RootDir, CrawlerParseMIME)
|
||||
if err != nil {
|
||||
log.Errorf("CRAWLER - dc.walkDirNoRecursion() in CrawlNoRecursion() returned error: %s", err)
|
||||
}
|
||||
}()
|
||||
wg.Wait()
|
||||
for _, worker := range workers {
|
||||
worker.stop()
|
||||
}
|
||||
} else {
|
||||
// If the path is a file, add it to the cache directly
|
||||
dc.cache.Add(StripRootDir(path, RootDir), NewItem(path, info, CachePrintNew, RootDir, CrawlerParseMIME))
|
||||
}
|
||||
|
||||
// Mark the scan as inactive
|
||||
mapMutex.Lock()
|
||||
activeScans[path] = false
|
||||
mapMutex.Unlock()
|
||||
// Decrement running scans count
|
||||
atomic.AddInt32(&runningScans, -1)
|
||||
return nil
|
||||
}
|
||||
|
||||
// functions to get the number of running scans and workers
|
||||
|
||||
func GetRunningScans() int32 {
|
||||
return atomic.LoadInt32(&runningScans)
|
||||
}
|
||||
|
||||
func GetRunningWorkers() int32 {
|
||||
return atomic.LoadInt32(&runningWorkers)
|
||||
}
|
||||
|
||||
// Functions to get the current directory of a worker and the active scans
|
||||
|
||||
func GetWorkerDir(id int32) string {
|
||||
mapMutex.Lock()
|
||||
defer mapMutex.Unlock()
|
||||
return workerDirs[id]
|
||||
}
|
||||
|
||||
func GetActiveScans() map[string]bool {
|
||||
mapMutex.Lock()
|
||||
defer mapMutex.Unlock()
|
||||
return activeScans
|
||||
}
|
|
@ -5,6 +5,7 @@ import (
|
|||
"crazyfs/data"
|
||||
"crazyfs/logging"
|
||||
lru "github.com/hashicorp/golang-lru/v2"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
@ -29,10 +30,12 @@ func StartCrawler(basePath string, sharedCache *lru.Cache[string, *data.Item], c
|
|||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
dc := NewDirectoryCrawler(sharedCache)
|
||||
|
||||
pool := NewWorkerPool()
|
||||
crawler := NewDirectoryCrawler(sharedCache, pool)
|
||||
log.Infoln("CRAWLER - Starting a crawl...")
|
||||
start := time.Now()
|
||||
err := dc.Crawl(basePath, cfg.CachePrintNew, cfg.CrawlWorkers, cfg.RootDir, cfg.CrawlerParseMIME)
|
||||
err := crawler.Crawl(basePath, true)
|
||||
duration := time.Since(start).Round(time.Second)
|
||||
if err != nil {
|
||||
log.Warnf("CRAWLER - Crawl failed: %s", err)
|
||||
|
@ -58,3 +61,57 @@ func StartCrawler(basePath string, sharedCache *lru.Cache[string, *data.Item], c
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewItem(path string, info os.FileInfo) *data.Item {
|
||||
if PrintNew {
|
||||
log = logging.GetLogger()
|
||||
log.Debugf("CACHE - new: %s", path)
|
||||
}
|
||||
|
||||
// Start processing the MIME type right away.
|
||||
// It will run in the background while we set up the Item object.
|
||||
ch := make(chan [2]string)
|
||||
go AnalyzeFileMime(path, info, CrawlerParseMIME, ch)
|
||||
|
||||
item := itemPool.Get().(*data.Item)
|
||||
|
||||
// Reset fields
|
||||
item.Path = ""
|
||||
item.Name = ""
|
||||
item.Size = 0
|
||||
item.Extension = nil
|
||||
item.Modified = ""
|
||||
item.Mode = 0
|
||||
item.IsDir = false
|
||||
item.IsSymlink = false
|
||||
item.Cached = 0
|
||||
item.Children = item.Children[:0]
|
||||
item.Type = nil
|
||||
|
||||
// Set fields
|
||||
item.Path = StripRootDir(path, RootDir)
|
||||
item.Name = info.Name()
|
||||
item.Size = info.Size()
|
||||
item.Modified = info.ModTime().UTC().Format(time.RFC3339Nano)
|
||||
item.Mode = uint32(info.Mode().Perm())
|
||||
item.IsDir = info.IsDir()
|
||||
item.IsSymlink = info.Mode()&os.ModeSymlink != 0
|
||||
item.Cached = time.Now().UnixNano() / int64(time.Millisecond)
|
||||
|
||||
// Get the MIME data from the background thread
|
||||
mimeResult := <-ch // This will block until the goroutine finishes
|
||||
ext, mimeType := mimeResult[0], mimeResult[1]
|
||||
|
||||
// Create pointers for mimeType and ext to allow empty JSON strings
|
||||
var mimeTypePtr, extPtr *string
|
||||
if mimeType != "" {
|
||||
mimeTypePtr = &mimeType
|
||||
}
|
||||
if ext != "" {
|
||||
extPtr = &ext
|
||||
}
|
||||
item.Extension = extPtr
|
||||
item.Type = mimeTypePtr
|
||||
|
||||
return item
|
||||
}
|
||||
|
|
|
@ -17,33 +17,53 @@ func StripRootDir(path, RootDir string) string {
|
|||
}
|
||||
}
|
||||
|
||||
func GetMimeType(path string, analyze bool) (bool, string, string, error) {
|
||||
var MIME *mimetype.MIME
|
||||
var mimeType string
|
||||
var ext string
|
||||
func GetFileMime(path string, analyze bool) (bool, string, string, error) {
|
||||
var err error
|
||||
info, err := os.Stat(path)
|
||||
if err != nil {
|
||||
// File does not exist
|
||||
return false, "", "", err
|
||||
}
|
||||
if !info.IsDir() {
|
||||
ext = filepath.Ext(path)
|
||||
if analyze {
|
||||
MIME, err = mimetype.DetectFile(path)
|
||||
if err != nil {
|
||||
log.Warnf("Error analyzing MIME type: %v", err)
|
||||
return false, "", "", err
|
||||
}
|
||||
mimeType = MIME.String()
|
||||
} else {
|
||||
mimeType = mime.TypeByExtension(ext)
|
||||
}
|
||||
} else {
|
||||
return true, "", ext, nil
|
||||
}
|
||||
if strings.Contains(mimeType, ";") {
|
||||
mimeType = strings.Split(mimeType, ";")[0]
|
||||
}
|
||||
ch := make(chan [2]string)
|
||||
go AnalyzeFileMime(path, info, analyze, ch)
|
||||
|
||||
// Get the MIME data from the background thread
|
||||
mimeResult := <-ch // This will block until the goroutine finishes
|
||||
ext, mimeType := mimeResult[0], mimeResult[1]
|
||||
return true, mimeType, ext, nil
|
||||
}
|
||||
|
||||
func detectMIME(path string, info os.FileInfo) string {
|
||||
if info.Mode()&os.ModeType == 0 {
|
||||
mimeObj, err := mimetype.DetectFile(path)
|
||||
if err != nil {
|
||||
log.Warnf("Error detecting MIME type: %v", err)
|
||||
return ""
|
||||
} else {
|
||||
return mimeObj.String()
|
||||
}
|
||||
} else {
|
||||
return ""
|
||||
}
|
||||
}
|
||||
|
||||
func AnalyzeFileMime(path string, info os.FileInfo, analyze bool, ch chan<- [2]string) {
|
||||
go func() {
|
||||
var ext string
|
||||
var mimeType string
|
||||
if !info.IsDir() && !(info.Mode()&os.ModeSymlink == os.ModeSymlink) {
|
||||
if CrawlerParseMIME || analyze {
|
||||
ext = filepath.Ext(path)
|
||||
mimeType = detectMIME(path, info)
|
||||
} else {
|
||||
mimeType = mime.TypeByExtension(ext)
|
||||
}
|
||||
if strings.Contains(mimeType, ";") {
|
||||
mimeType = strings.Split(mimeType, ";")[0]
|
||||
}
|
||||
ch <- [2]string{ext, mimeType}
|
||||
} else {
|
||||
ch <- [2]string{"", ""}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
|
|
@ -23,7 +23,7 @@ func InitialCrawl(sharedCache *lru.Cache[string, *data.Item], cfg *config.Config
|
|||
var wg sync.WaitGroup
|
||||
cacheFull := make(chan bool, 1) // Channel to signal when cache is full
|
||||
|
||||
// Start worker goroutines
|
||||
// Start Worker goroutines
|
||||
for i := 0; i < runtime.NumCPU()*6; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
|
@ -62,8 +62,9 @@ func InitialCrawl(sharedCache *lru.Cache[string, *data.Item], cfg *config.Config
|
|||
}
|
||||
|
||||
func crawlDir(dir string, sharedCache *lru.Cache[string, *data.Item], cacheFull chan<- bool, cfg *config.Config) {
|
||||
dc := NewDirectoryCrawler(sharedCache)
|
||||
err := dc.Crawl(dir, cfg.CachePrintNew, cfg.CrawlWorkers, cfg.RootDir, cfg.CrawlerParseMIME)
|
||||
pool := NewWorkerPool()
|
||||
crawler := NewDirectoryCrawler(sharedCache, pool)
|
||||
err := crawler.Crawl(dir, true)
|
||||
if err != nil {
|
||||
log.Fatalf("Crawl failed: %s", err)
|
||||
return
|
||||
|
|
|
@ -24,8 +24,9 @@ func CheckAndRecache(path string, cfg *config.Config, sharedCache *lru.Cache[str
|
|||
sem <- struct{}{} // acquire a token
|
||||
go func() {
|
||||
defer func() { <-sem }() // release the token when done
|
||||
dc := NewDirectoryCrawler(sharedCache)
|
||||
err := dc.Crawl(path, cfg.CachePrintNew, cfg.CrawlWorkers, cfg.RootDir, cfg.CrawlerParseMIME)
|
||||
pool := NewWorkerPool()
|
||||
crawler := NewDirectoryCrawler(sharedCache, pool)
|
||||
err := crawler.Crawl(path, true)
|
||||
if err != nil {
|
||||
log.Errorf("RECACHE ERROR: %s", err.Error())
|
||||
}
|
||||
|
@ -39,8 +40,9 @@ func Recache(path string, cfg *config.Config, sharedCache *lru.Cache[string, *da
|
|||
sem <- struct{}{} // acquire a token
|
||||
go func() {
|
||||
defer func() { <-sem }() // release the token when done
|
||||
dc := NewDirectoryCrawler(sharedCache)
|
||||
err := dc.Crawl(path, cfg.CachePrintNew, cfg.CrawlWorkers, cfg.RootDir, cfg.CrawlerParseMIME)
|
||||
pool := NewWorkerPool()
|
||||
crawler := NewDirectoryCrawler(sharedCache, pool)
|
||||
err := crawler.Crawl(path, true)
|
||||
if err != nil {
|
||||
log.Errorf("RECACHE ERROR: %s", err.Error())
|
||||
}
|
||||
|
@ -62,7 +64,7 @@ func Recache(path string, cfg *config.Config, sharedCache *lru.Cache[string, *da
|
|||
if err != nil {
|
||||
log.Errorf("RECACHE ERROR: %s", err.Error())
|
||||
} else {
|
||||
newItem := NewItem(path, info, cfg.CachePrintNew, cfg.RootDir, cfg.CrawlerParseMIME)
|
||||
newItem := NewItem(path, info)
|
||||
// Create a new slice that contains all items from the Children field except the old directory
|
||||
newChildren := make([]*data.Item, 0, len(parentItem.Children))
|
||||
for _, child := range parentItem.Children {
|
||||
|
@ -79,7 +81,7 @@ func Recache(path string, cfg *config.Config, sharedCache *lru.Cache[string, *da
|
|||
}
|
||||
} else {
|
||||
// If the parent directory isn't in the cache, crawl it
|
||||
err := dc.Crawl(parentDir, cfg.CachePrintNew, cfg.CrawlWorkers, cfg.RootDir, cfg.CrawlerParseMIME)
|
||||
err := crawler.Crawl(parentDir, true)
|
||||
if err != nil {
|
||||
log.Errorf("RECACHE ERROR: %s", err.Error())
|
||||
}
|
||||
|
|
|
@ -65,8 +65,9 @@ func StartWatcher(basePath string, sharedCache *lru.Cache[string, *data.Item], c
|
|||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
dc := NewDirectoryCrawler(sharedCache)
|
||||
err := dc.Crawl(event.Path, cfg.CachePrintNew, cfg.CrawlWorkers, cfg.RootDir, cfg.CrawlerParseMIME)
|
||||
pool := NewWorkerPool()
|
||||
crawler := NewDirectoryCrawler(sharedCache, pool)
|
||||
err := crawler.Crawl(event.Path, true)
|
||||
if err != nil {
|
||||
log.Warnf("WATCHER - Crawl failed: %s", err)
|
||||
}
|
||||
|
|
|
@ -29,6 +29,8 @@ type Config struct {
|
|||
RestrictedDownloadPaths []string
|
||||
ApiSearchMaxResults int
|
||||
ApiSearchShowChildren bool
|
||||
CrawlerChannelBufferSize int
|
||||
CrawlerWorkerPoolSize int
|
||||
}
|
||||
|
||||
func LoadConfig(configFile string) (*Config, error) {
|
||||
|
@ -53,6 +55,8 @@ func LoadConfig(configFile string) (*Config, error) {
|
|||
viper.SetDefault("api_search_max_results", 1000)
|
||||
viper.SetDefault("api_search_show_children", false)
|
||||
viper.SetDefault("http_allow_during_initial_crawl", false)
|
||||
viper.SetDefault("crawler_channel_buffer_size", 1000)
|
||||
viper.SetDefault("crawler_worker_pool_size", 200)
|
||||
|
||||
err := viper.ReadInConfig()
|
||||
if err != nil {
|
||||
|
@ -89,6 +93,8 @@ func LoadConfig(configFile string) (*Config, error) {
|
|||
RestrictedDownloadPaths: restrictedPaths,
|
||||
ApiSearchMaxResults: viper.GetInt("api_search_max_results"),
|
||||
ApiSearchShowChildren: viper.GetBool("api_search_show_children"),
|
||||
CrawlerChannelBufferSize: viper.GetInt("crawler_channel_buffer_size"),
|
||||
CrawlerWorkerPoolSize: viper.GetInt("crawler_worker_pool_size"),
|
||||
}
|
||||
|
||||
if config.WatchMode != "crawl" && config.WatchMode != "watch" {
|
||||
|
@ -131,5 +137,9 @@ func LoadConfig(configFile string) (*Config, error) {
|
|||
return nil, errors.New("api_search_max_results must not be less than 1")
|
||||
}
|
||||
|
||||
if config.CrawlerChannelBufferSize < 1 {
|
||||
return nil, errors.New("crawler_channel_buffer_size must not be less than 1")
|
||||
}
|
||||
|
||||
return config, nil
|
||||
}
|
||||
|
|
|
@ -81,6 +81,12 @@ func main() {
|
|||
log.Fatalf("Failed to load config file: %s", err)
|
||||
}
|
||||
|
||||
cache.WorkerBufferSize = cfg.CrawlerChannelBufferSize
|
||||
cache.PrintNew = cfg.CachePrintNew
|
||||
cache.RootDir = cfg.RootDir
|
||||
cache.CrawlerParseMIME = cfg.CrawlerParseMIME
|
||||
cache.MaxWorkers = cfg.CrawlWorkers
|
||||
|
||||
sharedCache, err := lru.New[string, *data.Item](cfg.CacheSize)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
|
|
Loading…
Reference in New Issue