use one elastic queue for add and delete jobs, use locks for global elastic variables, reject symlinks, code cleanup, move to go 1.22

This commit is contained in:
Cyberes 2024-03-17 14:25:11 -06:00
parent 92e55ca319
commit dc3b164520
28 changed files with 310 additions and 215 deletions

View File

@ -16,5 +16,5 @@ go build -v -trimpath -ldflags "-s -w -X main.VersionDate=$(date -u --iso-8601=m
if [ $? -eq 0 ]; then
chmod +x "$SCRIPT_DIR/dist/crazyfs"
echo "Finished building -> $SCRIPT_DIR/dist/crazyfs"
echo "Build Succeeded -> $SCRIPT_DIR/dist/crazyfs"
fi

View File

@ -32,7 +32,7 @@ func HandleFileNotFound(relPath string, fullPath string, w http.ResponseWriter)
// Start a blocking non-recursive crawl.
start := time.Now()
item, err := dc.CrawlNoRecursion(fullPath)
item, err := dc.CrawlNoRecursion(fullPath, nil)
if err == nil && (os.IsNotExist(err) || item == nil) {
ReturnFake404Msg("path not found", w)
return nil
@ -49,35 +49,11 @@ func HandleFileNotFound(relPath string, fullPath string, w http.ResponseWriter)
// Try to get the data from the cache again.
item, found := sharedcache.Cache.Get(relPath)
if !found {
// TODO: let's not re-check the disk if the file is still not in the cache. Instead, let's just assume that it doesn't exist.
// Assume that it doesn't exist.
ReturnFake404Msg("path not found", w)
// TODO: this is the old code in case this isn't the right approach.
// If the data is still not in the cache, check if the file or directory exists.
// We could check if the file exists before checking the cache but we want to limit disk reads.
//if _, err := os.Stat(fullPath); os.IsNotExist(err) {
// log.Debugf("File not in cache: %s", fullPath)
// // If the file or directory does not exist, return a 404 status code and a message
// ReturnFake404Msg("file or directory not found", w)
// return nil
//} else if err != nil {
// // If there was an error checking if the file or directory exists, return a 500 status code and the error
// log.Errorf("LIST - %s", err.Error())
// Return500Msg(w)
// return nil
//}
}
// If cacheitem is still nil, error
if item == nil {
log.Errorf("HELPERS:HandleFileNotFound:Crawl - Failed to find %s and did not return a 404", relPath)
Return500Msg(w)
return nil
}
// TODO: is this needed? I don't remember why this function is here. It seems to do another recursive crawl
//cache.CheckAndRecache(fullPath)
duration := time.Since(start).Round(time.Second)
log.Debugf(`HandleFileNotFound:Crawl - Took %s to find the missing path "%s"`, duration, relPath)

View File

@ -143,6 +143,5 @@ func wrongMethod(expectedMethod string, next AppHandler) AppHandler {
"code": http.StatusBadRequest,
"error": fmt.Sprintf("Received a %s request on a %s endpoint", r.Method, expectedMethod),
})
return
}
}

76
src/cache/crawler.go vendored
View File

@ -1,61 +1,57 @@
package cache
import (
"context"
"crazyfs/config"
"crazyfs/directorycrawler"
"crazyfs/globals"
"crazyfs/sharedcache"
"sync"
"sync/atomic"
"time"
)
func StartCrawler() error {
var wg sync.WaitGroup
go startCrawl(&wg)
ticker := time.NewTicker(60 * time.Second)
go logCacheStatus("CACHE STATUS", ticker, log.Debugf)
go startRecurringCrawl()
return nil
}
func startCrawl(wg *sync.WaitGroup) {
ticker := time.NewTicker(time.Duration(config.GetConfig().CrawlModeCrawlInterval) * time.Second)
defer ticker.Stop()
// startRecurringCrawl never exits.
func startRecurringCrawl() {
crawlTicker := time.NewTicker(time.Duration(config.GetConfig().CrawlModeCrawlInterval) * time.Second)
printTicker := time.NewTicker(60 * time.Second)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go logCacheStatus("CACHE STATUS", printTicker, log.Debugf, ctx)
time.Sleep(time.Duration(config.GetConfig().CrawlModeCrawlInterval) * time.Second)
for range ticker.C {
wg.Add(1)
go func() {
defer wg.Done()
dc := directorycrawler.NewDirectoryCrawler(globals.DirectoryCrawlers.Queue)
log.Infoln("CRAWLER - Recurring - Starting a crawl...")
start := time.Now()
err := dc.Crawl(config.GetConfig().RootDir, nil)
if err != nil {
log.Warnf("CRAWLER - Recurring - Crawl failed: %s", err)
} else {
duration := time.Since(start).Round(time.Second)
log.Infof("CRAWLER - Recurring - Crawl completed in %s", duration)
log.Debugf("%d/%d items in the cache.", len(sharedcache.Cache.Keys()), config.GetConfig().CacheSize)
}
}()
}
}
func logCacheStatus(msg string, ticker *time.Ticker, logFn func(format string, args ...interface{})) {
defer ticker.Stop()
for range ticker.C {
if !config.GetConfig().ElasticsearchSyncEnable || InitialCrawlInProgress {
logStr := "%s - %d/%d items in the cache. Busy workers: %d. Jobs queued: %d. Running crawls: %d."
logFn(logStr,
msg, len(sharedcache.Cache.Keys()), config.GetConfig().CacheSize, atomic.LoadInt32(&globals.DirectoryCrawlers.BusyWorkers), globals.DirectoryCrawlers.Queue.GetQueueSize(), directorycrawler.GetTotalActiveCrawls())
for range crawlTicker.C {
dc := directorycrawler.NewDirectoryCrawler(globals.DirectoryCrawlers.Queue)
log.Infoln("CRAWLER - Recurring - Starting a crawl...")
start := time.Now()
err := dc.Crawl(config.GetConfig().RootDir, nil)
if err != nil {
log.Warnf("CRAWLER - Recurring - Crawl failed: %s", err)
} else {
logStr := "%s - %d/%d items in the cache. Busy workers: %d. Jobs queued: %d. Running crawls: %d. Busy Elastic delete workers: %d. Elastic deletes queued: %d"
logFn(logStr,
msg, len(sharedcache.Cache.Keys()), config.GetConfig().CacheSize, atomic.LoadInt32(&globals.DirectoryCrawlers.BusyWorkers), globals.DirectoryCrawlers.Queue.GetQueueSize(), directorycrawler.GetTotalActiveCrawls(), globals.ElasticCrawlers.BusyWorkers, globals.ElasticCrawlers.Queue.GetQueueSize())
duration := time.Since(start).Round(time.Second)
log.Infof("CRAWLER - Recurring - Crawl completed in %s", duration)
log.Debugf("%d/%d items in the cache.", len(sharedcache.Cache.Keys()), config.GetConfig().CacheSize)
}
}
}
func logCacheStatus(msg string, ticker *time.Ticker, logFn func(format string, args ...interface{}), ctx context.Context) {
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if !InitialCrawlInProgress {
logStr := "%s - %d/%d items in the cache. Busy workers: %d. Jobs queued: %d. Running crawls: %d."
logFn(logStr,
msg, len(sharedcache.Cache.Keys()), config.GetConfig().CacheSize, atomic.LoadInt32(&globals.DirectoryCrawlers.BusyWorkers), globals.DirectoryCrawlers.Queue.GetQueueSize(), directorycrawler.GetTotalActiveCrawls())
}
}
}
}

View File

@ -1,6 +1,7 @@
package cache
import (
"context"
"crazyfs/config"
"crazyfs/directorycrawler"
"crazyfs/globals"
@ -24,7 +25,9 @@ func InitialCrawl() {
log.Infof("CRAWLER:Inital - Starting the crawl for %s", config.GetConfig().RootDir)
ticker := time.NewTicker(3 * time.Second)
go logCacheStatus("INITIAL CRAWL", ticker, log.Infof)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go logCacheStatus("INITIAL CRAWL", ticker, log.Infof, ctx)
InitialCrawlLock.Lock()
InitialCrawlInProgress = true

View File

@ -73,7 +73,12 @@ func Recache(path string) error {
log.Errorf("CACHE:Recache - %s", err.Error())
return
} else {
newItem := cacheitem.NewItem(path, info)
newItem, err := cacheitem.NewItem(path, info)
if err != nil {
log.Errorf("CACHE:Recache - %s", err.Error())
return
}
// Create a new slice that contains all items from the Children field except the old directory
newChildren := make([]string, 0, len(parentItem.Children))
for _, child := range parentItem.Children {
@ -91,7 +96,7 @@ func Recache(path string) error {
} else if !file.PathOutsideRoot(parentDir) {
// If the parent directory isn't in the cache, crawl it.
log.Infof("CACHE:Recache - Crawling parent directory since it isn't in the cache yet: %s", parentDir)
_, err := dc.CrawlNoRecursion(parentDir)
_, err := dc.CrawlNoRecursion(parentDir, nil)
if err != nil {
log.Errorf("CACHE:Recache - %s", err.Error())
return

View File

@ -3,26 +3,26 @@ package cacheitem
import (
"crazyfs/config"
"crazyfs/file"
"errors"
"fmt"
"os"
"path/filepath"
"time"
)
func NewItem(fullPath string, info os.FileInfo) *Item {
func NewItem(fullPath string, info os.FileInfo) (*Item, error) {
file.RetardCheck(fullPath)
if info.Mode()&os.ModeSymlink != 0 {
return nil, file.RejectSymlinkErr
}
if config.GetConfig().CachePrintNew {
log.Debugf("CACHEITEM:New - New cache item: %s", fullPath)
}
pathExists, _ := file.PathExists(fullPath)
if !pathExists {
if info.Mode()&os.ModeSymlink > 0 {
// Ignore symlinks
return nil
} else {
log.Warnf("CACHEITEM:New - Path does not exist: %s", fullPath)
return nil
}
return nil, errors.New(fmt.Sprintf("path does not exist: %s", fullPath))
}
var mimeType string
@ -31,8 +31,8 @@ func NewItem(fullPath string, info os.FileInfo) *Item {
var err error
if !info.IsDir() {
var mimePath string
if config.FollowSymlinks && info.Mode()&os.ModeSymlink > 0 {
mimePath, _ = filepath.EvalSymlinks(fullPath)
if info.Mode()&os.ModeSymlink != 0 {
return nil, file.RejectSymlinkErr
} else {
mimePath = fullPath
}
@ -53,8 +53,7 @@ func NewItem(fullPath string, info os.FileInfo) *Item {
// Catch any errors caused by detecting the MIME.
if os.IsNotExist(err) {
log.Warnf("CACHEITEM:New - Cannot detect MIME: path does not exist: %s", fullPath)
return nil
return nil, errors.New(fmt.Sprintf("Cannot detect MIME: path does not exist: %s", fullPath))
} else if err != nil {
log.Warnf("CACHEITEM:New - Error detecting MIME type of file %s - %v", fullPath, err)
}
@ -82,5 +81,5 @@ func NewItem(fullPath string, info os.FileInfo) *Item {
Children: make([]string, 0),
MimeType: mimeTypePtr,
Encoding: &encoding,
}
}, nil
}

View File

@ -1,44 +0,0 @@
package crawlers
import (
"crazyfs/config"
"crazyfs/elastic"
"crazyfs/globals"
"crazyfs/sharedcache"
"crazyfs/workers"
"sync/atomic"
)
func InitializeElasticCrawlerWorkers() *globals.DcWorkers {
if globals.ElasticCrawlers != nil {
panic("ElasticCrawlers has already been defined!")
}
workers := workers.InitializeWorkers(elasticDeleteWorker)
d := &globals.DcWorkers{}
d.Queue = workers.Queue
d.BusyWorkers = workers.BusyWorkers
globals.ElasticCrawlers = d
log.Debugf("CRAWLERS - Started %d elastic sync workers.", config.GetConfig().ElasticsearchSyncThreads)
return d
}
func elasticDeleteWorker(w *workers.CrawlWorkers) {
for {
job := w.Queue.GetJob()
atomic.AddInt32(&w.BusyWorkers, 1)
if _, ok := sharedcache.Cache.Get(job.StartPath); !ok {
// If a key in Elastic does not exist in the LRU cache, delete it from Elastic.
e := *job.Extra
key := e["key"].(string)
err := elastic.DeleteFromElasticsearch(key)
if err != nil {
log.Errorf(`ELASTIC:Delete - Error deleting key "%s" - %s`, key, err)
} else {
log.Debugf(`ELASTIC:Delete - Deleted path: "%s"`, job.StartPath)
}
}
atomic.AddInt32(&w.BusyWorkers, -1)
}
}

View File

@ -1,12 +0,0 @@
package crawlers
import (
"crazyfs/logging"
"github.com/sirupsen/logrus"
)
var log *logrus.Logger
func init() {
log = logging.GetLogger()
}

View File

@ -4,7 +4,7 @@ import (
"crazyfs/api"
"crazyfs/cache"
"crazyfs/config"
"crazyfs/crawlers"
"crazyfs/directorycrawler"
"crazyfs/elastic"
"crazyfs/logging"
"crazyfs/sharedcache"
@ -129,7 +129,6 @@ func main() {
elastic.Enabled = false
} else {
elastic.ElasticClient = es
crawlers.InitializeElasticCrawlerWorkers()
// This could take a minute, so we do this in the background while we crawl.
go func() {
elastic.EnableElasticsearchConnection()
@ -151,7 +150,7 @@ func main() {
log.Infof("Elasticsearch enabled: %t", cfg.ElasticsearchEnable && !cliArgs.disableElasticSync)
crawlers.InitializeDirectoryCrawlerWorkers()
directorycrawler.InitializeDirectoryCrawlerWorkers()
cache.InitRecacheSemaphore(cfg.CacheRecacheCrawlerLimit)

View File

@ -56,15 +56,14 @@ func (dc *DirectoryCrawler) CleanupDeletedFiles(path string) {
})
}
func (dc *DirectoryCrawler) AddCacheItem(fullPath string, info os.FileInfo) {
func (dc *DirectoryCrawler) addCacheItem(fullPath string, info os.FileInfo) error {
strippedPath := file.StripRootDir(fullPath)
item := cacheitem.NewItem(fullPath, info)
if item != nil {
// Sometimes cacheitem.NewItem will return nil if the path fails its checks
sharedcache.Cache.Add(strippedPath, item)
} else {
//log.Errorf("NewItem returned nil for %s", fullPath)
item, err := cacheitem.NewItem(fullPath, info)
if err != nil {
return err
}
sharedcache.Cache.Add(strippedPath, item)
return nil
}
func isSubpath(path, subpath string) bool {

View File

@ -1,4 +1,4 @@
package crawlers
package directorycrawler
import (
"crazyfs/config"
@ -11,26 +11,29 @@ func InitializeDirectoryCrawlerWorkers() *globals.DcWorkers {
if globals.DirectoryCrawlers != nil {
panic("DirectoryCrawlers has already been defined!")
}
workers := workers.InitializeWorkers(directoryCrawlerWorker)
dcWorkers := workers.InitializeWorkers(directoryCrawlerWorker)
d := &globals.DcWorkers{}
d.Queue = workers.Queue
d.BusyWorkers = workers.BusyWorkers
d.Queue = dcWorkers.Queue
d.BusyWorkers = dcWorkers.BusyWorkers
globals.DirectoryCrawlers = d
log.Debugf("CRAWLERS - Started %d directory crawler workers.", config.GetConfig().DirectoryCrawlers)
log.Debugf("CRAWLERS - Started %d directory crawler dc_workers.", config.GetConfig().DirectoryCrawlers)
return d
}
func directoryCrawlerWorker(w *workers.CrawlWorkers) {
for {
job := w.Queue.GetJob()
// TODO: reminder that this worker type does not support shutdown
atomic.AddInt32(&w.BusyWorkers, 1)
err := job.Walker.ProcessPath(job.StartPath)
err := job.Walker.ReadPathAndQueue(job.StartPath)
if err != nil {
log.Warnf("WORKER - %s - %s", job.StartPath, err)
log.Warnf("DirCrawlWorker - %s - %s", job.StartPath, err)
}
job.Walker.Wg.Done()
atomic.AddInt32(&w.BusyWorkers, -1)
}
}

View File

@ -8,6 +8,7 @@ import (
"crazyfs/sharedcache"
"errors"
"fmt"
"io/fs"
"os"
"path/filepath"
"reflect"
@ -19,7 +20,7 @@ func (dc *DirectoryCrawler) walkRecursiveFunc(fullPath string, info os.FileInfo,
file.RetardCheck(fullPath)
processErr := dc.processPath(fullPath, info)
if processErr != nil {
log.Errorf(`CRAWLER:walkRecursiveFunc - failed on "%s": %s`, fullPath, processErr)
log.Errorf(`walkRecursiveFunc failed on "%s": %s`, fullPath, processErr)
return processErr
}
return nil
@ -34,13 +35,13 @@ func (dc *DirectoryCrawler) walkNonRecursiveFunc(fullPath string, dir os.DirEntr
}
processErr := dc.processPath(fullPath, info)
if processErr != nil {
log.Errorf(`CRAWLER:walkNonRecursiveFunc - Failed on "%s": %s`, fullPath, processErr)
log.Errorf(`walkNonRecursiveFunc failed on "%s": %s`, fullPath, processErr)
return processErr
}
return nil
}
func (dc *DirectoryCrawler) Crawl(fullPath string, walkFunc func(string, os.FileInfo, error) error) error {
func (dc *DirectoryCrawler) Crawl(fullPath string, walkFunc filepath.WalkFunc) error {
file.RetardCheck(fullPath)
// Set default value.
@ -48,8 +49,6 @@ func (dc *DirectoryCrawler) Crawl(fullPath string, walkFunc func(string, os.File
walkFunc = dc.walkRecursiveFunc
}
// TODO: check if symlink and reject if it is
//Extrapolate the name of the callback function.
pc := reflect.ValueOf(walkFunc).Pointer()
fn := runtime.FuncForPC(pc)
@ -73,6 +72,9 @@ func (dc *DirectoryCrawler) Crawl(fullPath string, walkFunc func(string, os.File
log.Errorf(`CRAWLER:Crawl - os.Lstat() failed on "%s": %s`, fullPath, err)
return err
}
if info.Mode()&os.ModeSymlink != 0 {
return file.RejectSymlinkErr
}
//relPath := file.StripRootDir(fullPath)
@ -92,29 +94,43 @@ func (dc *DirectoryCrawler) Crawl(fullPath string, walkFunc func(string, os.File
sharedcache.Cache.Remove(key)
}
// If the path is a directory, start a queuedwalk
// If the path is a directory, start a walk
err := queuedwalk.Walk(fullPath, config.FollowSymlinks, walkFunc, dc.queue)
if err != nil {
log.Errorf(`CRAWLER:Crawl - Crawl for "%s" failed: %s`, fullPath, err)
}
} else {
// If the path is a file, add it to the cache directly
dc.AddCacheItem(fullPath, info)
err := dc.addCacheItem(fullPath, info)
if err != nil {
return err
}
}
return nil
}
// CrawlNoRecursion this function crawls a file or directory and does not recurse into any subdirectories. Also returns the result of the crawl.
func (dc *DirectoryCrawler) CrawlNoRecursion(fullPath string) (*cacheitem.Item, error) {
func (dc *DirectoryCrawler) CrawlNoRecursion(fullPath string, walkFunc fs.WalkDirFunc) (*cacheitem.Item, error) {
file.RetardCheck(fullPath)
// TODO: check if symlink and reject if it is
// Set default value.
if walkFunc == nil {
walkFunc = dc.walkNonRecursiveFunc
}
readyToStart := dc.startCrawl(fullPath, "walkNonRecursive")
//Extrapolate the name of the callback function.
pc := reflect.ValueOf(walkFunc).Pointer()
fn := runtime.FuncForPC(pc)
fullName := fn.Name()
parts := strings.Split(fullName, ".")
funcName := parts[len(parts)-1]
cleanFuncName := strings.TrimSuffix(funcName, "Func-fm")
readyToStart := dc.startCrawl(fullPath, cleanFuncName)
if !readyToStart {
return nil, errors.New(fmt.Sprintf(`rejecting crawl, already in progress for "%s"`, fullPath))
}
defer dc.endCrawl(fullPath, "walkNonRecursive")
defer dc.endCrawl(fullPath, cleanFuncName)
info, err := os.Lstat(fullPath)
if os.IsNotExist(err) {
@ -125,6 +141,9 @@ func (dc *DirectoryCrawler) CrawlNoRecursion(fullPath string) (*cacheitem.Item,
log.Errorf(`CRAWLER:CrawlNoRecursion - os.Lstat() failed on "%s": %s`, fullPath, err)
return nil, err
}
if info.Mode()&os.ModeSymlink != 0 {
return nil, file.RejectSymlinkErr
}
//if !config.FollowSymlinks && info.Mode()&os.ModeSymlink > 0 {
// msg := fmt.Sprintf("CRAWL - tried to crawl a symlink (not allowed in config): %s", fullPath)
@ -158,8 +177,14 @@ func (dc *DirectoryCrawler) CrawlNoRecursion(fullPath string) (*cacheitem.Item,
}
item, _ = sharedcache.Cache.Get(relPath)
} else {
item = cacheitem.NewItem(fullPath, info)
dc.AddCacheItem(fullPath, info)
item, err = cacheitem.NewItem(fullPath, info)
if err != nil {
return nil, err
}
err := dc.addCacheItem(fullPath, info)
if err != nil {
return nil, err
}
}
return item, nil
}

View File

@ -15,11 +15,13 @@ func (dc *DirectoryCrawler) processPath(fullPath string, info os.FileInfo) error
dc.visited.Store(relPath, true)
if info.Mode().IsDir() {
dirItem := cacheitem.NewItem(fullPath, info)
dirItem, err := cacheitem.NewItem(fullPath, info)
if err != nil {
return err
}
children, err := os.ReadDir(fullPath)
if err != nil {
log.Errorf(`CRAWLER:processPath - Failed to read directory "%s": %s`, fullPath, err)
return err
}
@ -52,7 +54,10 @@ func (dc *DirectoryCrawler) processPath(fullPath string, info os.FileInfo) error
}
} else {
// StartPath is a file
dc.AddCacheItem(fullPath, info)
err := dc.addCacheItem(fullPath, info)
if err != nil {
return err
}
}
return nil
}

View File

@ -0,0 +1,69 @@
package elastic
import (
"crazyfs/config"
"crazyfs/globals"
"crazyfs/sharedcache"
"crazyfs/workers"
"fmt"
"sync"
"sync/atomic"
)
// aliveWorkers is used by syncElasticsearch to know when it is safe to erase the queue.
var aliveWorkers sync.WaitGroup
func InitializeElasticCrawlerWorkers() *globals.DcWorkers {
if globals.ElasticCrawlers != nil {
panic("ElasticCrawlers has already been defined!")
}
deleteWorkers := workers.InitializeWorkers(elasticDeleteWorker)
d := &globals.DcWorkers{}
d.Queue = deleteWorkers.Queue
d.BusyWorkers = deleteWorkers.BusyWorkers
globals.ElasticCrawlers = d
log.Debugf("CRAWLERS - Started %d Elasticsearch sync workers.", config.GetConfig().ElasticsearchSyncThreads)
return d
}
func elasticDeleteWorker(w *workers.CrawlWorkers) {
aliveWorkers.Add(1)
defer aliveWorkers.Done()
for {
job := w.Queue.GetJob()
if job.Terminate {
fmt.Println("delete worker stopping")
return
}
atomic.AddInt32(&w.BusyWorkers, 1)
if job.Extra == nil {
// Jobs without any extras are the standard Walk jobs
err := job.Walker.ReadPathAndQueue(job.StartPath)
if err != nil {
log.Warnf("ELCrawlWorker - %s - %s", job.StartPath, err)
}
job.Walker.Wg.Done()
} else {
e := *job.Extra
task := e["task"]
if task == TASKDELETE {
if _, ok := sharedcache.Cache.Get(job.StartPath); !ok {
// If a key in Elastic does not exist in the LRU cache, delete it from Elastic.
key := e["key"].(string)
err := DeleteFromElasticsearch(key)
if err != nil {
log.Errorf(`ELASTIC:Delete - Error deleting key "%s" - %s`, key, err)
} else {
log.Debugf(`ELASTIC:Delete - Deleted path: "%s"`, job.StartPath)
}
}
} else {
panic(task)
}
}
atomic.AddInt32(&w.BusyWorkers, -1)
}
}

View File

@ -1,9 +1,11 @@
package elastic
import (
"context"
"crazyfs/config"
"crazyfs/directorycrawler"
"crazyfs/globals"
"fmt"
"sync"
"time"
)
@ -38,7 +40,6 @@ func SyncThread() {
}
}
// TODO: have the workers exit when the sync job is finished
func syncElasticsearch(doFullSync bool) {
if !Enabled {
log.Debugln("ELASTIC - Disabled, not syncing.")
@ -71,22 +72,52 @@ func syncElasticsearch(doFullSync bool) {
log.Infof("ELASTIC - Started a %s sync.", syncType)
start := time.Now()
InitializeElasticCrawlerWorkers()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ticker := time.NewTicker(time.Duration(config.GetConfig().CrawlModeCrawlInterval) * time.Second)
defer ticker.Stop()
go func() {
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
logStr := "ELASTIC - Busy Elastic delete workers: %d. Elastic deletes queued: %d"
log.Debugf(logStr, globals.ElasticCrawlers.BusyWorkers, globals.ElasticCrawlers.Queue.GetQueueSize())
}
}
}()
// Refresh the global variables for the workers.
var err error
globalPathsByKeyMutex.Lock()
globalKeysByPathMutex.Lock()
globalKeysByPath, globalPathsByKey, err = getPathsFromIndex()
globalPathsByKeyMutex.Unlock()
globalKeysByPathMutex.Unlock()
if err != nil {
log.Errorf("ELASTIC - Error retrieving keys from Elasticsearch: %s", err)
return
}
startRemoveStaleItemsFromElasticsearch(globalPathsByKey)
startRemoveStaleItemsFromElasticsearch()
dc := directorycrawler.NewDirectoryCrawler(globals.DirectoryCrawlers.Queue) // TODO: replace with proper elastic queue
dc := directorycrawler.NewDirectoryCrawler(globals.ElasticCrawlers.Queue)
err = dc.Crawl(config.GetConfig().RootDir, addToElasticsearch)
if err != nil {
log.Errorf("ELASTIC - Crawl failed: %s", err)
return
}
// Shut down the elastic sync workers once we've finished.
globals.ElasticCrawlers.Queue.Terminate()
aliveWorkers.Wait()
fmt.Println("cleared ElasticCrawlers")
globals.ElasticCrawlers = nil
duration := time.Since(start)
log.Infof("ELASTIC - %s sync finished in %s", syncType, duration)
}

View File

@ -10,13 +10,17 @@ import (
"encoding/json"
"github.com/elastic/go-elasticsearch/v8/esapi"
"os"
"sync"
)
// existingKeys is a global variable called by the Walker callback: addToElasticsearch().
// It is set only by syncElasticsearch() when a sync is started. Only one sync can run at a time.
// A global is needed since there is no way to pass variables like this to the workers.
var globalKeysByPath map[string]string
var globalKeysByPathMutex sync.RWMutex
var globalPathsByKey map[string]string
var globalPathsByKeyMutex sync.RWMutex
// fullSync is another global variable accessed by the workers and only set by syncElasticsearch()
var fullSync bool
@ -34,20 +38,22 @@ func addToElasticsearch(fullPath string, info os.FileInfo, incomingErr error) er
log.Errorf("ELASTIC:Add - Failed to delete \"%s\" - %s", relPath, err)
}
} else {
globalPathsByKeyMutex.RLock()
defer globalPathsByKeyMutex.RUnlock()
if _, ok := globalPathsByKey[relPath]; ok {
// Item already exists.
if fullSync {
preformAddToElasticsearch(cacheItem)
performAddToElasticsearch(cacheItem)
}
} else {
preformAddToElasticsearch(cacheItem)
performAddToElasticsearch(cacheItem)
}
}
}
return nil
}
func preformAddToElasticsearch(item *cacheitem.Item) {
func performAddToElasticsearch(item *cacheitem.Item) {
preparedItem, err := prepareCacheItem(item)
if err != nil {
log.Printf("ELASTIC:Add - Error preparing new item: %s", err)

View File

@ -16,17 +16,22 @@ type DeleteJob struct {
Path string
}
func startRemoveStaleItemsFromElasticsearch(pathsByKey map[string]string) {
const (
TASKADD = "add"
TASKDELETE = "delete"
)
func startRemoveStaleItemsFromElasticsearch() {
log.Debugln("ELASTIC:Delete - Checking for deleted items that need to be removed from Elastic...")
// TODO: use waitgroups here so we know when all the jobs are done and we can erase globalKeysByPath and globalPathsByKey
globalPathsByKeyMutex.RLock()
defer globalPathsByKeyMutex.RUnlock()
// For each key in Elasticsearch, create a job to check (and remove it if the key no longer exists in the cache).
for path, key := range pathsByKey {
for path, key := range globalPathsByKey {
job := queuedwalk.Job{
StartPath: path,
}
extra := make(map[string]interface{})
extra["task"] = TASKDELETE
extra["key"] = key
job.Extra = &extra

View File

@ -3,6 +3,8 @@ package file
import (
"crazyfs/config"
"crazyfs/logging"
"errors"
"fmt"
"github.com/gabriel-vasile/mimetype"
"github.com/sirupsen/logrus"
"mime"
@ -42,15 +44,14 @@ func GetMimeType(path string, analyze bool, passedInfo *os.FileInfo) (bool, stri
return false, "", "", err
}
if !info.IsDir() {
if info.Mode()&os.ModeSymlink != 0 && !config.FollowSymlinks {
return false, "", "", nil
if info.Mode()&os.ModeSymlink != 0 {
return false, "", "", RejectSymlinkErr
}
ext = filepath.Ext(path)
if analyze {
MIME, err = mimetype.DetectFile(path)
if err != nil {
log.Errorf("FILE:GetMimeType - Error analyzing MIME type: %v", err)
return false, "", "", err
return false, "", "", errors.New(fmt.Sprintf("Error analyzing MIME type: %v", err))
}
mimeType = MIME.String()
} else {

View File

@ -0,0 +1,5 @@
package file
import "errors"
var RejectSymlinkErr = errors.New("symlink rejected")

View File

@ -5,7 +5,6 @@ import (
)
var ElasticCrawlers *DcWorkers
var DirectoryCrawlers *DcWorkers
type DcWorkers struct {

View File

@ -1,6 +1,6 @@
module crazyfs
go 1.20
go 1.22
require (
github.com/chai2010/webp v1.1.1

View File

@ -66,6 +66,7 @@ github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5y
github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/frankban/quicktest v1.14.4 h1:g2rn0vABPOOXmZUj+vbmUp0lPoXEMuhTpIluN0XL9UY=
github.com/frankban/quicktest v1.14.4/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw=
github.com/gabriel-vasile/mimetype v1.4.2 h1:w5qFW6JKBz9Y393Y4q372O9A7cUSequkh1Q7OhCmWKU=
@ -110,6 +111,7 @@ github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=
github.com/google/martian/v3 v3.1.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=
@ -148,9 +150,11 @@ github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQs
github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY=
github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
@ -168,6 +172,7 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/saintfish/chardet v0.0.0-20230101081208-5e3ef4b5456d h1:hrujxIzL1woJ7AwssoOcM/tq5JjjG2yYOc8odClEiXA=
github.com/saintfish/chardet v0.0.0-20230101081208-5e3ef4b5456d/go.mod h1:uugorj2VCxiV1x+LzaIdVa9b4S4qGAcH6cbhh4qVxOU=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=

View File

@ -14,5 +14,5 @@ func Walk(root string, followSymlinks bool, walkFn filepath.WalkFunc, queue *Job
walkFunc: walkFn,
queue: queue,
}
return w.Walk("", walkFn)
return w.walk("", walkFn)
}

View File

@ -12,11 +12,13 @@ type Job struct {
StartPath string
Walker *Walker // A pointer to the shared Walker object is passed as well.
Extra *map[string]interface{}
Terminate bool
}
// JobQueue is the Queue that workers pull jobs from.
type JobQueue struct {
fifo *goconcurrentqueue.FIFO
fifo *goconcurrentqueue.FIFO
terminate bool
// Use our own condition to notify workers since `DequeueOrWaitForNextElement()` is limited to max 1000.
mutex sync.RWMutex
@ -31,17 +33,29 @@ func NewJobQueue() *JobQueue {
}
// AddJob adds a job to the queue and signals the workers so they know to pick it up.
func (q *JobQueue) AddJob(job Job) {
func (q *JobQueue) AddJob(job Job) bool {
if q.terminate {
return false
}
err := q.fifo.Enqueue(job)
if err != nil {
// Some sort of logic error or timeout occurred.
panic(err)
}
q.cond.Signal()
return true
}
// GetJob is how a worker pulls a job from the queue.
func (q *JobQueue) GetJob() Job {
if q.terminate {
// Return an empty job that tells the worker to quit.
return Job{
StartPath: "",
Terminate: true,
}
}
q.mutex.Lock()
defer q.mutex.Unlock()
for q.GetQueueSize() == 0 {
@ -58,3 +72,10 @@ func (q *JobQueue) GetJob() Job {
func (q *JobQueue) GetQueueSize() int {
return q.fifo.GetLen()
}
// Terminate tells the workers to shut down and locks the queue.
// The caller is then supposed to erase the Queue variable.
func (q *JobQueue) Terminate() {
q.terminate = true
q.fifo.Lock()
}

View File

@ -1,6 +1,7 @@
package queuedwalk
import (
"crazyfs/file"
"errors"
"fmt"
"path/filepath"
@ -28,12 +29,12 @@ func (w *Walker) addJob(job Job) {
w.queue.AddJob(job)
}
// ProcessPath processes one path.
func (w *Walker) ProcessPath(relPath string) error {
// ReadPathAndQueue processes one path.
func (w *Walker) ReadPathAndQueue(relPath string) error {
fullPath := filepath.Join(w.root, relPath)
names, err := readDirNames(fullPath)
if err != nil {
log.Errorf("Walker:processPath:readDirNames - %s", err)
log.Errorf("Walker:ReadPathAndQueue:readDirNames - %s", err)
return err
}
@ -43,12 +44,15 @@ func (w *Walker) ProcessPath(relPath string) error {
info, err := w.lstat(subPath)
if err != nil {
log.Warnf("Walker:processPath - %s - %s", relPath, err)
if !errors.Is(err, file.RejectSymlinkErr) {
// Only print a warning if it's not a symlink error.
log.Warnf("Walker:ReadPathAndQueue - %s - %s", relPath, err)
}
continue
}
if info == nil {
log.Warnf("Walker:processPath - %s - %s", relPath, err)
log.Warnf("Walker:ReadPathAndQueue - %s - %s", relPath, err)
continue
}
@ -71,12 +75,16 @@ func (w *Walker) ProcessPath(relPath string) error {
// Walk recursively descends into subdirectories, calling the user-defined walkFn for each file or directory
// in the tree, starting with the root directory. It is only called one place: `queuedwalk()` in queuedwalk.go
func (w *Walker) Walk(relPath string, walkFn filepath.WalkFunc) error {
func (w *Walker) walk(relPath string, walkFn filepath.WalkFunc) error {
// TODO: compare with filepath.WalkDir()
// Parse the beginning path.
fullPath := filepath.Join(w.root, relPath)
info, err := w.lstat(relPath)
if err != nil {
return err
}
err = w.walkFunc(fullPath, info, err)
if errors.Is(err, filepath.SkipDir) {
return nil

View File

@ -1,6 +1,7 @@
package queuedwalk
import (
"crazyfs/file"
"os"
"path/filepath"
)
@ -33,24 +34,15 @@ func readDirNames(dirname string) ([]string, error) {
return names, nil
}
// lstat is a wrapper for os.Lstat which accepts a path relative to Walker.root and also follows symlinks
// lstat is a wrapper for os.Lstat which accepts a path relative to Walker.root and also rejects symlinks
func (w *Walker) lstat(relPath string) (info os.FileInfo, err error) {
fullPath := filepath.Join(w.root, relPath)
info, err = os.Lstat(fullPath)
if err != nil {
return nil, err
}
if w.followSymlinks {
if info.Mode()&os.ModeSymlink > 0 {
fullPath, err = filepath.EvalSymlinks(fullPath)
if err != nil {
return nil, err
}
info, err = os.Lstat(fullPath)
if err != nil {
return nil, err
}
}
if info.Mode()&os.ModeSymlink != 0 {
return nil, file.RejectSymlinkErr
}
return
}

View File

@ -86,7 +86,7 @@ func NewResponseItem(cacheItem *cacheitem.Item) *ResponseItem {
// TODO: when does this get triggered?
log.Debugf(`NewResponseItem:Crawl - Not in cache, crawling: "%s" ("%s")`, child, crawlRelPath)
dc := directorycrawler.NewDirectoryCrawler(globals.DirectoryCrawlers.Queue)
item, err := dc.CrawlNoRecursion(crawlRelPath)
item, err := dc.CrawlNoRecursion(crawlRelPath, nil)
if err != nil {
log.Errorf("NewResponseItem:Crawl - %s", err)
continue // skip this child