redo error handling, bug fixes

This commit is contained in:
Cyberes 2024-03-17 17:25:27 -06:00
parent dc3b164520
commit 9a450571bd
26 changed files with 244 additions and 252 deletions

View File

@ -10,7 +10,8 @@ import (
"time"
)
// HandleFileNotFound if the data is not in the cache, start a new crawler
// HandleFileNotFound if the data is not in the cache, start a new crawler.
// If it encounters an error, it will return a bad status code to the HTTP client and log the error.
func HandleFileNotFound(relPath string, fullPath string, w http.ResponseWriter) *cacheitem.Item {
// TODO: implement some sort of backoff or delay for repeated calls to recache the same path.

View File

@ -72,7 +72,7 @@ func APISearch(w http.ResponseWriter, r *http.Request) {
// Perform the Elasticsearch query
resp, err := elastic.SimpleQuery(queryString, excludeElements)
if err != nil {
log.Errorf(`ROUTES:APISearch - Failed to perform Elasticsearch query "%s" - %s`, queryString, err)
log.Errorf(`ROUTES:Search - Failed to perform Elasticsearch query "%s" - %s`, queryString, err)
helpers.Return500Msg(w)
return
}
@ -81,7 +81,7 @@ func APISearch(w http.ResponseWriter, r *http.Request) {
var respData map[string]interface{}
err = json.NewDecoder(resp.Body).Decode(&respData)
if err != nil {
log.Errorf(`ROUTES:APISearch - Failed to parse Elasticsearch response for query "%s" - %s`, queryString, err)
log.Errorf(`ROUTES:Search - Failed to parse Elasticsearch response for query "%s" - %s`, queryString, err)
helpers.Return500Msg(w)
return
}
@ -152,7 +152,7 @@ func APISearch(w http.ResponseWriter, r *http.Request) {
}
searchDuration := time.Since(searchStart) // .Round(time.Second)
log.Debugf(`ROUTES:APISearch - %s - Query: "%s" - Results: %d - Elapsed: %d`, logging.GetRealIP(r), queryString, len(results), searchDuration)
log.Debugf(`ROUTES:Search - %s - Query: "%s" - Results: %d - Elapsed: %d`, logging.GetRealIP(r), queryString, len(results), searchDuration)
response := map[string]interface{}{
"results": results,

View File

@ -36,6 +36,22 @@ func APIAdminCrawlsInfo(w http.ResponseWriter, r *http.Request) {
elastic.FullSyncRunning.Release(1)
}
var elasticWorkers map[string]interface{}
if (refreshSyncRunning || fullSyncRunning) && globals.ElasticCrawlers != nil {
// If no sync is running then these vars will not exist.
elasticWorkers = map[string]interface{}{
"busy": globals.ElasticCrawlers.BusyWorkers,
"alive": config.GetConfig().ElasticsearchSyncThreads,
"queueSize": globals.ElasticCrawlers.Queue.GetQueueSize(),
}
} else {
elasticWorkers = map[string]interface{}{
"busy": 0,
"alive": 0,
"queueSize": 0,
}
}
response := map[string]interface{}{
"crawls": map[string]interface{}{
"active": directorycrawler.GetActiveCrawls(),
@ -46,15 +62,11 @@ func APIAdminCrawlsInfo(w http.ResponseWriter, r *http.Request) {
"alive": config.GetConfig().DirectoryCrawlers,
},
"queue": map[string]interface{}{
"size": globals.DirectoryCrawlers.Queue.GetQueueSize(),
"items": globals.DirectoryCrawlers.Queue.GetQueueSize(),
},
"initialCrawlElapsed": config.InitialCrawlElapsed,
"elastic": map[string]interface{}{
"deleteWorkers": map[string]interface{}{
"busy": globals.ElasticCrawlers.BusyWorkers,
"alive": config.GetConfig().ElasticsearchSyncThreads,
"queueSize": globals.ElasticCrawlers.Queue.GetQueueSize(),
},
"workers": elasticWorkers,
"syncRunning": map[string]interface{}{
"refresh": refreshSyncRunning,
"full": fullSyncRunning,

View File

@ -6,7 +6,6 @@ import (
"crazyfs/config"
"crazyfs/file"
"encoding/json"
"fmt"
"net/http"
)
@ -31,13 +30,13 @@ func APIAdminRecache(w http.ResponseWriter, r *http.Request) {
return
}
pathArg := requestBody["path"]
relPathArg := requestBody["path"]
// Clean the path to prevent directory traversal
fullPath, errJoin := file.SafeJoin(pathArg)
traversalAttack, errTraverse := file.DetectTraversal(pathArg)
if traversalAttack || errJoin != nil {
log.Errorf("ROUTES:ADMIN:Recache - Failed to clean path: %s - error: %s - traversal attack detected: %t - traversal attack detection: %s", pathArg, errJoin, traversalAttack, errTraverse)
fullPath := file.SafeJoinRoot(relPathArg)
traversalAttack, errTraverse := file.DetectTraversal(relPathArg)
if traversalAttack {
log.Errorf(`ROUTES:ADMIN:Recache - invalid path: "%s". Error: %s`, relPathArg, errTraverse)
helpers.Return400Msg("invalid file path", w)
return
}
@ -49,7 +48,7 @@ func APIAdminRecache(w http.ResponseWriter, r *http.Request) {
// Check and re-cache the directory
err = cache.Recache(fullPath)
if err != nil {
response["message"] = fmt.Sprintf("recache failed")
response["message"] = "recache failed"
response["error"] = err.Error()
w.WriteHeader(http.StatusConflict)
log.Errorf("Admin triggered recache for %s - %s", fullPath, err)

View File

@ -17,20 +17,20 @@ func APIDownload(w http.ResponseWriter, r *http.Request) {
return
}
pathArg := r.URL.Query().Get("path")
if pathArg == "" {
relPathArg := r.URL.Query().Get("path")
if relPathArg == "" {
helpers.Return400Msg("missing path", w)
return
}
paths := strings.Split(pathArg, ",")
paths := strings.Split(relPathArg, ",")
var cleanPaths []string
if len(paths) > 1 {
for _, path := range paths {
cleanPath, errJoin := file.SafeJoin(path)
cleanPath := file.SafeJoinRoot(path)
traversalAttack, errTraverse := file.DetectTraversal(path)
if traversalAttack || errJoin != nil {
log.Errorf("ROUTES:Download - Failed to clean path: %s - error: %s - traversal attack detected: %t - traversal attack detection: %s", path, errJoin, traversalAttack, errTraverse)
if traversalAttack {
log.Errorf(`ROUTES:Download - invalid path: "%s". Error: %s`, path, errTraverse)
helpers.Return400Msg("invalid file path", w)
return
}
@ -50,10 +50,10 @@ func APIDownload(w http.ResponseWriter, r *http.Request) {
}
// Single file or directory
fullPath, errJoin := file.SafeJoin(pathArg)
traversalAttack, errTraverse := file.DetectTraversal(pathArg)
if traversalAttack || errJoin != nil {
log.Errorf("ROUTES:Download - Failed to clean path: %s - error: %s - traversal attack detected: %t - traversal attack detection: %s", pathArg, errJoin, traversalAttack, errTraverse)
fullPath := file.SafeJoinRoot(relPathArg)
traversalAttack, errTraverse := file.DetectTraversal(relPathArg)
if traversalAttack {
log.Errorf(`ROUTES:Download - invalid path: "%s". Error: %s`, relPathArg, errTraverse)
helpers.Return400Msg("invalid file path", w)
return
}
@ -97,7 +97,7 @@ func APIDownload(w http.ResponseWriter, r *http.Request) {
var mimeType string
var err error
if item.MimeType == nil { // only if the MIME type of this item has not been set yet
_, mimeType, _, err = file.GetMimeType(fullPath, true, nil)
_, mimeType, _, err = file.GetMimeType(fullPath, true)
if err != nil {
log.Errorf("ROUTES:Download - Error detecting MIME type: %v", err)
} else if mimeType != "" {

View File

@ -16,8 +16,8 @@ func APIList(w http.ResponseWriter, r *http.Request) {
return
}
pathArg := r.URL.Query().Get("path")
if pathArg == "" {
relPathArg := r.URL.Query().Get("path")
if relPathArg == "" {
helpers.Return400Msg("path parameter is required", w)
return
}
@ -35,10 +35,10 @@ func APIList(w http.ResponseWriter, r *http.Request) {
return
}
fullPath, errJoin := file.SafeJoin(pathArg)
traversalAttack, errTraverse := file.DetectTraversal(pathArg)
if traversalAttack || errJoin != nil {
log.Errorf("ROUTES:List - Failed to clean path: %s - error: %s - traversal attack detected: %t - traversal attack detection: %s", pathArg, errJoin, traversalAttack, errTraverse)
fullPath := file.SafeJoinRoot(relPathArg)
traversalAttack, errTraverse := file.DetectTraversal(relPathArg)
if traversalAttack {
log.Errorf(`ROUTES:Download - invalid path: "%s". Error: %s`, relPathArg, errTraverse)
helpers.Return400Msg("invalid file path", w)
return
}
@ -62,21 +62,20 @@ func APIList(w http.ResponseWriter, r *http.Request) {
return
} else {
// Only update the mime in the cache if it hasn't been set already.
// TODO: need to make sure that when a re-crawl is triggered, the MimeType is set back to nil
if cacheItem.MimeType == nil {
fileExists, mimeType, ext, err := file.GetMimeType(fullPath, true, nil)
if !fileExists {
helpers.ReturnFake404Msg("file not found", w)
}
fileExists, mimeType, ext, err := file.GetMimeType(fullPath, true)
if err != nil {
log.Warnf("ROUTES:List - Error detecting MIME type: %v", err)
helpers.Return500Msg(w)
return
}
// Update the original cached cacheitem's MIME in the sharedCache
if !fileExists {
helpers.ReturnFake404Msg("file not found", w)
}
// Update the original cached item's MIME in the sharedCache.
cacheItem.MimeType = &mimeType
cacheItem.Extension = &ext
sharedcache.Cache.Add(relPath, cacheItem) // take the address of cacheitem
sharedcache.Cache.Add(relPath, cacheItem) // take the address of item
}
}
}

View File

@ -6,7 +6,6 @@ import (
"crazyfs/cache"
"crazyfs/config"
"crazyfs/file"
"crazyfs/logging"
"crazyfs/sharedcache"
"fmt"
"github.com/disintegration/imaging"
@ -17,23 +16,23 @@ import (
"image/color"
"image/png"
"net/http"
"path/filepath"
"strings"
)
func APIThumbnail(w http.ResponseWriter, r *http.Request) {
if cache.InitialCrawlInProgress && !config.GetConfig().HttpAllowDuringInitialCrawl {
helpers.HandleRejectDuringInitialCrawl(w)
returnDummyPNG(w)
return
}
log := logging.GetLogger()
relPath := file.StripRootDir(filepath.Join(config.GetConfig().RootDir, r.URL.Query().Get("path")))
relPath = strings.TrimSuffix(relPath, "/")
fullPath := filepath.Join(config.GetConfig().RootDir, relPath)
// Validate the path arg.
relPathArg := r.URL.Query().Get("path")
if relPathArg == "" {
helpers.Return400Msg("path parameter is required", w)
return
}
// Validate args before doing any operations
// Validate height and width args.
width, err := getPositiveIntFromQuery(r, "width")
if err != nil {
helpers.Return400Msg("height and width must both be positive numbers", w)
@ -45,6 +44,7 @@ func APIThumbnail(w http.ResponseWriter, r *http.Request) {
return
}
// Validate the quality arg.
pngQuality, err := getPositiveIntFromQuery(r, "quality")
if err != nil {
helpers.Return400Msg("quality must be a positive number", w)
@ -54,6 +54,7 @@ func APIThumbnail(w http.ResponseWriter, r *http.Request) {
pngQuality = 50
}
// Validate the scale args.
autoScale := r.URL.Query().Get("auto") != ""
square := r.URL.Query().Get("square") != ""
if (width != 0 && height != 0) && (width != height) {
@ -61,33 +62,42 @@ func APIThumbnail(w http.ResponseWriter, r *http.Request) {
return
}
// Try to get the data from the cache
// Do path operations last since most of our CPU and memory usage comes from string operations.
fullPath := file.SafeJoinRoot(relPathArg) // Form the full path first, which also cleans it.
pathIsSafe, errTraverse := file.DetectTraversal(fullPath) // Verify that this isn't an invalid path.
if !pathIsSafe {
log.Errorf(`ROUTES:Download - invalid path: "%s". Error: %s`, fullPath, errTraverse)
helpers.Return400Msg("invalid path", w)
return
}
relPath := file.StripRootDir(fullPath) // Then re-create the relative path from the clean path.
// Try to get the item from the cache.
item, found := sharedcache.Cache.Get(relPath)
if !found {
item = helpers.HandleFileNotFound(relPath, fullPath, w)
}
if item == nil {
returnDummyPNG(w)
helpers.Return400Msg("file not found", w)
return
}
if item.IsDir {
helpers.Return400Msg("that's a directory", w)
return
}
// Get the MIME type of the file
fileExists, mimeType, ext, err := file.GetMimeType(fullPath, true, nil)
fileExists, mimeType, ext, err := file.GetMimeType(fullPath, true)
if !fileExists {
helpers.Return400Msg("file not found", w)
return
}
if err != nil {
log.Errorf("ROUTES:Thumb - Error detecting MIME type: %v", err)
returnDummyPNG(w)
log.Warnf(`ROUTES:Thumb - Error detecting MIME type for "%s". %v`, fullPath, err)
helpers.Return500Msg(w)
return
}
// Update the cacheitem's MIME in the sharedCache
// Update the item's MIME in the cache.
item.MimeType = &mimeType
item.Extension = &ext
sharedcache.Cache.Add(relPath, item)
@ -101,8 +111,8 @@ func APIThumbnail(w http.ResponseWriter, r *http.Request) {
// Convert the image to a PNG
imageBytes, err := file.ConvertToPNG(fullPath, mimeType)
if err != nil {
log.Warnf("ROUTES:Thumb - Error converting %s to PNG: %v", fullPath, err)
returnDummyPNG(w)
log.Warnf(`ROUTES:Thumb - Error converting "%s". %v`, fullPath, err)
helpers.Return500Msg(w)
return
}
@ -110,22 +120,23 @@ func APIThumbnail(w http.ResponseWriter, r *http.Request) {
var img image.Image
img, err = png.Decode(bytes.NewReader(imageBytes))
if err != nil {
log.Warnf("ROUTES:Thumb - Error decoding %s image data: %v", fullPath, err)
returnDummyPNG(w)
log.Warnf(`ROUTES:Thumb - Error decoding "%s". %v`, fullPath, err)
helpers.Return500Msg(w)
return
}
// Resize the image
img, err = resizeImage(img, width, height, square, autoScale)
if err != nil {
helpers.Return400Msg(err.Error(), w)
log.Warnf(`ROUTES:Thumb - Error resizing "%s". %v`, fullPath, err)
helpers.Return500Msg(w)
return
}
buf, err := file.CompressPNGFile(img, pngQuality)
if err != nil {
log.Warnf("ROUTES:Thumb - Error compressing %s to PNG: %v", fullPath, err)
returnDummyPNG(w)
log.Warnf(`ROUTES:Thumb - Error compressing "%s". %v`, fullPath, err)
helpers.Return500Msg(w)
return
}
@ -186,7 +197,7 @@ func resizeImage(img image.Image, width, height int, square, autoScale bool) (im
size = helpers.Max(img.Bounds().Dx(), img.Bounds().Dy())
}
// First, make the image square by scaling the smallest dimension to the larget size
// First, make the image square by scaling the smallest dimension to the largest size.
if img.Bounds().Dx() > img.Bounds().Dy() {
width = 0
height = size
@ -196,7 +207,7 @@ func resizeImage(img image.Image, width, height int, square, autoScale bool) (im
}
resized := resize.Resize(uint(width), uint(height), img, resize.Lanczos3)
// Then crop the image to the target size
// Then crop the image to the target size.
img = imaging.CropCenter(resized, size, size)
} else {
if width == 0 && height == 0 {
@ -211,12 +222,12 @@ func resizeImage(img image.Image, width, height int, square, autoScale bool) (im
height = 300
}
} else {
// Don't auto-resize because this endpoint can also be used for simply reducing the quality of an image
// Don't auto-resize because this endpoint can also be used for simply reducing the quality of an image.
width = img.Bounds().Dx()
height = img.Bounds().Dy()
}
} else if width == 0 {
// If only width is provided, calculate the height based on the image's aspect ratio
// If only width is provided, calculate the height based on the image's aspect ratio.
width = img.Bounds().Dx() * height / img.Bounds().Dy()
} else if height == 0 {
height = img.Bounds().Dy() * width / img.Bounds().Dx()

19
src/cache/crawler.go vendored
View File

@ -18,25 +18,32 @@ func StartCrawler() error {
// startRecurringCrawl never exits.
func startRecurringCrawl() {
crawlTicker := time.NewTicker(time.Duration(config.GetConfig().CrawlModeCrawlInterval) * time.Second)
printTicker := time.NewTicker(60 * time.Second)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go logCacheStatus("CACHE STATUS", printTicker, log.Debugf, ctx)
time.Sleep(time.Duration(config.GetConfig().CrawlModeCrawlInterval) * time.Second)
i := 0
for range crawlTicker.C {
dc := directorycrawler.NewDirectoryCrawler(globals.DirectoryCrawlers.Queue)
log.Infoln("CRAWLER - Recurring - Starting a crawl...")
start := time.Now()
err := dc.Crawl(config.GetConfig().RootDir, nil)
if err != nil {
log.Warnf("CRAWLER - Recurring - Crawl failed: %s", err)
if i == 0 {
// Exit if we failed to crawl on the first recurrence.
log.Fatalf("CRAWLER - Recurring - Crawl failed: %s", err)
} else {
log.Errorf("CRAWLER - Recurring - Crawl failed: %s", err)
}
} else {
duration := time.Since(start).Round(time.Second)
log.Infof("CRAWLER - Recurring - Crawl completed in %s", duration)
log.Debugf("%d/%d items in the cache.", len(sharedcache.Cache.Keys()), config.GetConfig().CacheSize)
}
i++
}
}
@ -47,11 +54,9 @@ func logCacheStatus(msg string, ticker *time.Ticker, logFn func(format string, a
case <-ctx.Done():
return
case <-ticker.C:
if !InitialCrawlInProgress {
logStr := "%s - %d/%d items in the cache. Busy workers: %d. Jobs queued: %d. Running crawls: %d."
logFn(logStr,
msg, len(sharedcache.Cache.Keys()), config.GetConfig().CacheSize, atomic.LoadInt32(&globals.DirectoryCrawlers.BusyWorkers), globals.DirectoryCrawlers.Queue.GetQueueSize(), directorycrawler.GetTotalActiveCrawls())
}
logStr := "%s - %d/%d items in the cache. Busy workers: %d. Jobs queued: %d. Running crawls: %d."
logFn(logStr,
msg, len(sharedcache.Cache.Keys()), config.GetConfig().CacheSize, atomic.LoadInt32(&globals.DirectoryCrawlers.BusyWorkers), globals.DirectoryCrawlers.Queue.GetQueueSize(), directorycrawler.GetTotalActiveCrawls())
}
}
}

13
src/cache/initial.go vendored
View File

@ -5,7 +5,6 @@ import (
"crazyfs/config"
"crazyfs/directorycrawler"
"crazyfs/globals"
"crazyfs/logging"
"sync"
"time"
)
@ -16,14 +15,9 @@ var InitialCrawlInProgress bool
var InitialCrawlLock sync.RWMutex
func init() {
InitialCrawlInProgress = false
}
func InitialCrawl() {
log = logging.GetLogger()
log.Infof("CRAWLER:Inital - Starting the crawl for %s", config.GetConfig().RootDir)
func InitialCrawl() error {
ticker := time.NewTicker(3 * time.Second)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -32,13 +26,12 @@ func InitialCrawl() {
InitialCrawlLock.Lock()
InitialCrawlInProgress = true
dc := directorycrawler.NewDirectoryCrawler(globals.DirectoryCrawlers.Queue)
//start := time.Now()
err := dc.Crawl(config.GetConfig().RootDir, nil)
if err != nil {
log.Errorf("CRAWLER:Inital - failed: %s", err)
return err
}
InitialCrawlInProgress = false
InitialCrawlLock.Unlock()
ticker.Stop()
//log.Infof("INITIAL CRAWL - finished the initial crawl in %s", time.Since(start).Round(time.Second))
return nil
}

View File

@ -19,7 +19,7 @@ func InitRecacheSemaphore(limit int) {
sem = make(chan struct{}, limit)
}
func CheckAndRecache(path string) {
func CheckAndRecacheBG(path string) {
item, found := sharedcache.Cache.Get(path)
if found && time.Now().UnixNano()/int64(time.Millisecond)-item.Cached > int64(config.GetConfig().CacheTime)*60*1000 {
log.Debugf("CACHE:Recache - re-caching: %s", path)

2
src/cache/search.go vendored
View File

@ -9,6 +9,7 @@ import (
"strings"
)
// TODO: this is old and probably does not work
func SearchLRU(queryString string, excludeElements []string, limitResults int) []*cacheitem.Item {
results := make([]*cacheitem.Item, 0)
@ -41,6 +42,7 @@ func SearchLRU(queryString string, excludeElements []string, limitResults int) [
return results
}
// TODO: this is old and probably does not work
func searchKey(key string, queryString string, excludeElements []string, sem chan struct{}, resultsChan chan *cacheitem.Item) {
// Acquire a token
sem <- struct{}{}

View File

@ -38,9 +38,9 @@ func NewItem(fullPath string, info os.FileInfo) (*Item, error) {
}
if config.GetConfig().CrawlerParseMIME {
_, mimeType, ext, err = file.GetMimeType(mimePath, true, &info)
_, mimeType, ext, err = file.GetMimeType(mimePath, true)
} else {
_, mimeType, ext, err = file.GetMimeType(mimePath, false, &info)
_, mimeType, ext, err = file.GetMimeType(mimePath, false)
}
if config.GetConfig().CrawlerParseEncoding {

View File

@ -165,9 +165,12 @@ func main() {
log.Infof("Server started on port %s", cfg.HTTPPort)
if cliArgs.initialCrawl || cfg.InitialCrawl {
log.Infoln("Preforming initial crawl...")
log.Infof(`Preforming initial crawl for "%s"`, config.GetConfig().RootDir)
start := time.Now()
cache.InitialCrawl()
err := cache.InitialCrawl()
if err != nil {
log.Fatalf("Inital crawl failed: %s", err)
}
duration := time.Since(start).Round(time.Second)
keys := sharedcache.Cache.Keys()
config.InitialCrawlElapsed = int(duration.Seconds())

View File

@ -148,9 +148,9 @@ func GetActiveCrawls() map[string]*ActiveCrawl {
func GetFinishedCrawls() []FinishedCrawl {
finishedCrawlsMutex.RLock()
defer finishedCrawlsMutex.RUnlock()
finishedCrawlsCopy := make([]FinishedCrawl, 0, maxFinishedCrawls)
for k, v := range finishedCrawls {
finishedCrawlsCopy[k] = v
finishedCrawlsCopy := make([]FinishedCrawl, len(finishedCrawls))
for i, v := range finishedCrawls {
finishedCrawlsCopy[i] = v
}
return finishedCrawlsCopy
}

View File

@ -16,21 +16,19 @@ func InitializeDirectoryCrawlerWorkers() *globals.DcWorkers {
d.Queue = dcWorkers.Queue
d.BusyWorkers = dcWorkers.BusyWorkers
globals.DirectoryCrawlers = d
log.Debugf("CRAWLERS - Started %d directory crawler dc_workers.", config.GetConfig().DirectoryCrawlers)
log.Debugf("CRAWLERS - Started %d directory crawler workers.", config.GetConfig().DirectoryCrawlers)
return d
}
func directoryCrawlerWorker(w *workers.CrawlWorkers) {
// Reminder that this worker type does not support shutdown
for {
job := w.Queue.GetJob()
// TODO: reminder that this worker type does not support shutdown
atomic.AddInt32(&w.BusyWorkers, 1)
err := job.Walker.ReadPathAndQueue(job.StartPath)
if err != nil {
log.Warnf("DirCrawlWorker - %s - %s", job.StartPath, err)
log.Warnf(`DirCrawlWorker:ReadPathAndQueue - error for "%s" - %s`, job.StartPath, err)
}
job.Walker.Wg.Done()

View File

@ -20,7 +20,6 @@ func (dc *DirectoryCrawler) walkRecursiveFunc(fullPath string, info os.FileInfo,
file.RetardCheck(fullPath)
processErr := dc.processPath(fullPath, info)
if processErr != nil {
log.Errorf(`walkRecursiveFunc failed on "%s": %s`, fullPath, processErr)
return processErr
}
return nil
@ -30,12 +29,10 @@ func (dc *DirectoryCrawler) walkNonRecursiveFunc(fullPath string, dir os.DirEntr
file.RetardCheck(fullPath)
info, infoErr := dir.Info()
if infoErr != nil {
log.Errorf(`CRAWLER:walkNonRecursiveFunc - Get info failed on "%s": %s`, fullPath, infoErr)
return infoErr
}
processErr := dc.processPath(fullPath, info)
if processErr != nil {
log.Errorf(`walkNonRecursiveFunc failed on "%s": %s`, fullPath, processErr)
return processErr
}
return nil
@ -69,7 +66,6 @@ func (dc *DirectoryCrawler) Crawl(fullPath string, walkFunc filepath.WalkFunc) e
return err
}
if err != nil {
log.Errorf(`CRAWLER:Crawl - os.Lstat() failed on "%s": %s`, fullPath, err)
return err
}
if info.Mode()&os.ModeSymlink != 0 {
@ -97,7 +93,7 @@ func (dc *DirectoryCrawler) Crawl(fullPath string, walkFunc filepath.WalkFunc) e
// If the path is a directory, start a walk
err := queuedwalk.Walk(fullPath, config.FollowSymlinks, walkFunc, dc.queue)
if err != nil {
log.Errorf(`CRAWLER:Crawl - Crawl for "%s" failed: %s`, fullPath, err)
return err
}
} else {
// If the path is a file, add it to the cache directly
@ -138,7 +134,6 @@ func (dc *DirectoryCrawler) CrawlNoRecursion(fullPath string, walkFunc fs.WalkDi
return nil, err
}
if err != nil {
log.Errorf(`CRAWLER:CrawlNoRecursion - os.Lstat() failed on "%s": %s`, fullPath, err)
return nil, err
}
if info.Mode()&os.ModeSymlink != 0 {
@ -172,7 +167,6 @@ func (dc *DirectoryCrawler) CrawlNoRecursion(fullPath string, walkFunc fs.WalkDi
err := filepath.WalkDir(fullPath, dc.walkNonRecursiveFunc)
if err != nil {
log.Errorf(`CRAWLER:CrawlNoRecursion - Crawl for "%s" failed: %s`, fullPath, err)
return nil, err
}
item, _ = sharedcache.Cache.Get(relPath)

View File

@ -53,7 +53,6 @@ func (dc *DirectoryCrawler) processPath(fullPath string, info os.FileInfo) error
}
}
} else {
// StartPath is a file
err := dc.addCacheItem(fullPath, info)
if err != nil {
return err

View File

@ -40,10 +40,10 @@ func elasticDeleteWorker(w *workers.CrawlWorkers) {
atomic.AddInt32(&w.BusyWorkers, 1)
if job.Extra == nil {
// Jobs without any extras are the standard Walk jobs
// Jobs without any extras are the standard Walk jobs that add items to Elastic.
err := job.Walker.ReadPathAndQueue(job.StartPath)
if err != nil {
log.Warnf("ELCrawlWorker - %s - %s", job.StartPath, err)
log.Warnf("ELCrawlWorker:Add - %s - %s", job.StartPath, err)
}
job.Walker.Wg.Done()
} else {
@ -55,9 +55,9 @@ func elasticDeleteWorker(w *workers.CrawlWorkers) {
key := e["key"].(string)
err := DeleteFromElasticsearch(key)
if err != nil {
log.Errorf(`ELASTIC:Delete - Error deleting key "%s" - %s`, key, err)
log.Errorf(`ELCrawlWorker:Delete - Error deleting key "%s" - %s`, key, err)
} else {
log.Debugf(`ELASTIC:Delete - Deleted path: "%s"`, job.StartPath)
log.Debugf(`ELCrawlWorker:Delete - Deleted path: "%s"`, job.StartPath)
}
}
} else {

View File

@ -5,7 +5,6 @@ import (
"crazyfs/config"
"crazyfs/directorycrawler"
"crazyfs/globals"
"fmt"
"sync"
"time"
)
@ -53,13 +52,13 @@ func syncElasticsearch(doFullSync bool) {
var syncType string
if doFullSync {
if !FullSyncRunning.TryAcquire(1) {
log.Fatalln("ELASTIC - Failed to acquire the FullSyncRunning semaphore. This is a logic error.")
panic("ELASTIC - Failed to acquire the FullSyncRunning semaphore. This is a logic error.")
}
defer FullSyncRunning.Release(1)
syncType = "full refresh"
} else {
if !RefreshSyncRunning.TryAcquire(1) {
log.Fatalln("ELASTIC - Failed to acquire the RefreshSyncRunning semaphore. This is a logic error.")
panic("ELASTIC - Failed to acquire the RefreshSyncRunning semaphore. This is a logic error.")
}
defer RefreshSyncRunning.Release(1)
syncType = "refresh"
@ -76,7 +75,7 @@ func syncElasticsearch(doFullSync bool) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ticker := time.NewTicker(time.Duration(config.GetConfig().CrawlModeCrawlInterval) * time.Second)
ticker := time.NewTicker(60 * time.Second)
defer ticker.Stop()
go func() {
@ -85,8 +84,9 @@ func syncElasticsearch(doFullSync bool) {
case <-ctx.Done():
return
case <-ticker.C:
logStr := "ELASTIC - Busy Elastic delete workers: %d. Elastic deletes queued: %d"
log.Debugf(logStr, globals.ElasticCrawlers.BusyWorkers, globals.ElasticCrawlers.Queue.GetQueueSize())
elapsed := time.Since(start)
logStr := "ELASTIC - Sync in progress. Elapsed: %d. Busy Elastic delete workers: %d. Elastic deletes queued: %d"
log.Debugf(logStr, elapsed, globals.ElasticCrawlers.BusyWorkers, globals.ElasticCrawlers.Queue.GetQueueSize())
}
}
}()
@ -115,7 +115,6 @@ func syncElasticsearch(doFullSync bool) {
// Shut down the elastic sync workers once we've finished.
globals.ElasticCrawlers.Queue.Terminate()
aliveWorkers.Wait()
fmt.Println("cleared ElasticCrawlers")
globals.ElasticCrawlers = nil
duration := time.Since(start)
@ -123,13 +122,13 @@ func syncElasticsearch(doFullSync bool) {
}
func logElasticConnError(err error) {
log.Errorf("ELASTIC - Failed to read the index: %s", err)
log.Errorf("ELASTIC - Failed to read the index: %s", err.Error())
}
// EnableElasticsearchConnection tests the connection to Elastic and enables the backend if it's successful.
func EnableElasticsearchConnection() {
esSize, err := getElasticSize()
if err != nil {
if err != nil || esSize == -1 {
logElasticConnError(err)
Enabled = false
return

View File

@ -8,6 +8,8 @@ import (
"crazyfs/file"
"crazyfs/sharedcache"
"encoding/json"
"errors"
"fmt"
"github.com/elastic/go-elasticsearch/v8/esapi"
"os"
"sync"
@ -25,6 +27,7 @@ var globalPathsByKeyMutex sync.RWMutex
// fullSync is another global variable accessed by the workers and only set by syncElasticsearch()
var fullSync bool
// Errors from this function are received by the Elastic sync workers.
func addToElasticsearch(fullPath string, info os.FileInfo, incomingErr error) error {
relPath := file.StripRootDir(fullPath)
if !shouldExclude(relPath, config.GetConfig().ElasticsearchExcludePatterns) {
@ -35,7 +38,7 @@ func addToElasticsearch(fullPath string, info os.FileInfo, incomingErr error) er
// Delete this item from Elastic in order to avoid any strange inconsistencies.
err := DeleteFromElasticsearch(encodeToBase64(relPath))
if err != nil {
log.Errorf("ELASTIC:Add - Failed to delete \"%s\" - %s", relPath, err)
return errors.New(fmt.Sprintf(`Failed to delete "%s" - %s`, relPath, err))
}
} else {
globalPathsByKeyMutex.RLock()
@ -43,27 +46,25 @@ func addToElasticsearch(fullPath string, info os.FileInfo, incomingErr error) er
if _, ok := globalPathsByKey[relPath]; ok {
// Item already exists.
if fullSync {
performAddToElasticsearch(cacheItem)
return performAddToElasticsearch(cacheItem)
}
} else {
performAddToElasticsearch(cacheItem)
return performAddToElasticsearch(cacheItem)
}
}
}
return nil
}
func performAddToElasticsearch(item *cacheitem.Item) {
func performAddToElasticsearch(item *cacheitem.Item) error {
preparedItem, err := prepareCacheItem(item)
if err != nil {
log.Printf("ELASTIC:Add - Error preparing new item: %s", err)
return
return err
}
data, err := json.Marshal(preparedItem)
if err != nil {
log.Printf("ELASTIC:Add - Error marshaling new item: %s", err)
return
return err
}
req := esapi.IndexRequest{
@ -74,20 +75,20 @@ func performAddToElasticsearch(item *cacheitem.Item) {
}
res, err := req.Do(context.Background(), ElasticClient)
if err != nil {
log.Errorf("ELASTIC:Add - Error getting response: %s", err)
return
return err
}
defer res.Body.Close()
if res.IsError() {
var e map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&e); err != nil {
log.Errorf("ELASTIC:Add - Error parsing the response body: %s", err)
return errors.New(fmt.Sprintf("Error parsing the response body: %s", err))
}
log.Errorf(`ELASTIC:Add - Error indexing document "%s" - Status code: %d - %s`, item.Path, res.StatusCode, e)
return errors.New(fmt.Sprintf(`Error indexing document "%s" - Status code: %d - Response: %s`, item.Path, res.StatusCode, e))
}
log.Debugf(`ELASTIC:Add - Added: "%s"`, preparedItem.Path)
return nil
}
// prepareCacheItem is used to get an item ready to insert into Elastic.

View File

@ -1,48 +1,32 @@
package file
import (
"crazyfs/config"
"crazyfs/logging"
"errors"
"fmt"
"github.com/gabriel-vasile/mimetype"
"github.com/sirupsen/logrus"
"mime"
"os"
"path/filepath"
"strings"
)
var log *logrus.Logger
func init() {
log = logging.GetLogger()
}
func GetMimeType(path string, analyze bool, passedInfo *os.FileInfo) (bool, string, string, error) {
func GetMimeType(path string, analyze bool) (bool, string, string, error) {
var MIME *mimetype.MIME
var mimeType string
var ext string
var err error
var info os.FileInfo
if config.FollowSymlinks {
info, err = os.Lstat(path)
} else {
if info == nil {
info, err = os.Stat(path)
} else {
info = *passedInfo
}
}
//if config.FollowSymlinks {
// info, err = os.Stat(path)
//} else {
info, err := os.Lstat(path)
if err != nil {
// File does not exist
return false, "", "", err
}
if info.Mode()&os.ModeSymlink != 0 {
return false, "", "", RejectSymlinkErr
}
if !info.IsDir() {
if info.Mode()&os.ModeSymlink != 0 {
return false, "", "", RejectSymlinkErr
@ -51,7 +35,7 @@ func GetMimeType(path string, analyze bool, passedInfo *os.FileInfo) (bool, stri
if analyze {
MIME, err = mimetype.DetectFile(path)
if err != nil {
return false, "", "", errors.New(fmt.Sprintf("Error analyzing MIME type: %v", err))
return false, "", "", err
}
mimeType = MIME.String()
} else {
@ -65,12 +49,3 @@ func GetMimeType(path string, analyze bool, passedInfo *os.FileInfo) (bool, stri
}
return true, mimeType, ext, nil
}
func StripRootDir(path string) string {
if path == "/" || path == config.GetConfig().RootDir || path == "" {
// Avoid erasing our path
return "/"
} else {
return strings.TrimSuffix(strings.TrimPrefix(path, config.GetConfig().RootDir), "/")
}
}

View File

@ -8,41 +8,45 @@ import (
"strings"
)
// SafeJoin Clean the provided path
func SafeJoin(pathArg string) (string, error) {
cleanPath := filepath.Join(config.GetConfig().RootDir, filepath.Clean(pathArg))
// SafeJoinRoot cleans and joins the provided relative path with the root directory path to form the full path.
func SafeJoinRoot(relPath string) string {
cleanPath := filepath.Join(config.GetConfig().RootDir, filepath.Clean(relPath))
cleanPath = strings.TrimRight(cleanPath, "/")
return cleanPath, nil
return cleanPath
}
func DetectTraversal(pathArg string) (bool, error) {
func DetectTraversal(relPath string) (bool, error) {
if strings.HasPrefix(relPath, config.GetConfig().RootDir) {
panic(fmt.Sprintf(`file.DetectTraversal() was given a path that had the root directory prefixed instead of a relative path. Make sure to call file.SafeJoinRoot() and then file.DetectTraversal(). Offending path: %s`, relPath))
}
// Remove the trailing slash so our checks always handle the same format
if pathArg != "/" {
pathArg = strings.TrimRight(pathArg, "/")
if relPath != "/" {
relPath = strings.TrimRight(relPath, "/")
}
// If the path starts with "~", a directory traversal attack is being attempted
if strings.HasPrefix(pathArg, "~") {
return true, fmt.Errorf("includes home directory: %s", pathArg)
if strings.HasPrefix(relPath, "~") {
return true, fmt.Errorf("includes home directory: %s", relPath)
}
// The file path should ALWAYS be absolute.
// For example: /Documents
if !filepath.IsAbs(pathArg) {
return true, fmt.Errorf("is not absolute path: %s", pathArg)
if !filepath.IsAbs(relPath) {
return true, fmt.Errorf("is not absolute path: %s", relPath)
}
cleanArg := filepath.Clean(pathArg)
cleanArg := filepath.Clean(relPath)
cleanPath := filepath.Join(config.GetConfig().RootDir, cleanArg)
// If the path is not within the base path, return an error
if !strings.HasPrefix(cleanPath, config.GetConfig().RootDir) {
return true, fmt.Errorf("the full path is outside the root dir: %s", pathArg)
return true, fmt.Errorf("the full path is outside the root dir: %s", relPath)
}
// If the cleaned path is not the same as the original path, a directory traversal attack is being attempted
if pathArg != cleanArg {
return true, fmt.Errorf("path. Clean modified the path arg from %s to %s", pathArg, cleanArg)
if relPath != cleanArg {
return true, fmt.Errorf("path. Clean modified the path arg from %s to %s", relPath, cleanArg)
}
return false, nil
@ -69,3 +73,12 @@ func PathExists(path string) (bool, error) {
return true, nil // File or symlink exists and is not broken
}
func StripRootDir(path string) string {
if path == "/" || path == config.GetConfig().RootDir || path == "" {
// Don't erase the path if it's the root.
return "/"
} else {
return strings.TrimSuffix(strings.TrimPrefix(path, config.GetConfig().RootDir), "/")
}
}

View File

@ -5,6 +5,8 @@ import (
"path/filepath"
)
// Inspired by https://github.com/iafan/cwalk/tree/master
// Walk is a wrapper function for the Walker object that mimics the behavior of filepath.Walk, and doesn't follow symlinks.
func Walk(root string, followSymlinks bool, walkFn filepath.WalkFunc, queue *JobQueue) error {
file.RetardCheck(root)
@ -14,5 +16,5 @@ func Walk(root string, followSymlinks bool, walkFn filepath.WalkFunc, queue *Job
walkFunc: walkFn,
queue: queue,
}
return w.walk("", walkFn)
return w.walker("")
}

View File

@ -3,13 +3,12 @@ package queuedwalk
import (
"crazyfs/file"
"errors"
"fmt"
"os"
"path/filepath"
"sync"
)
// Walker.go is the implementation behind `queuedwalk()`, which is a filesystem queuedwalk
// using workers that pull jobs from a queue.
// Walker.go is the implementation behind `Walk()`, which is a filesystem walk using workers that pull jobs from a queue.
// ErrNotDir indicates that the path, which is being passed to a walker function, does not point to a directory.
var ErrNotDir = errors.New("not a directory")
@ -23,6 +22,49 @@ type Walker struct {
queue *JobQueue
}
// Walk recursively descends into subdirectories, calling the user-defined walkFn for each file or directory
// in the tree, starting with the root directory.
// It is only called one place: `Walk()` in Walk.go. It walks the starting directory and sends jobs to the workers.
// Any errors in this function will be propagated to whatever called Walk.
// Similar to filepath.WalkDir().
func (w *Walker) walker(relPath string) error {
fullPath := filepath.Join(w.root, relPath)
info, err := w.lstat(relPath)
if err != nil {
return err
}
// Reject symlinks
if info.Mode()&os.ModeSymlink != 0 {
return file.RejectSymlinkErr
}
// If the input path was a file, reject it. We can only walk directories.
if !info.Mode().IsDir() {
return ErrNotDir
}
// Execute the walkFunc
walkFuncErr := w.walkFunc(fullPath, info, err)
if errors.Is(walkFuncErr, filepath.SkipDir) || errors.Is(walkFuncErr, filepath.SkipAll) {
return nil
} // If the walkFunc wants to skip this dir
if walkFuncErr != nil {
return walkFuncErr
} // If we encountered an actual error.
// Let the workers handle everything else.
w.addJob(Job{
StartPath: relPath,
Walker: w,
})
// Wait for the workers to finish the job we just added.
w.Wg.Wait()
return nil
}
// addJob increments the job counter and pushes the path to the job queue.
func (w *Walker) addJob(job Job) {
w.Wg.Add(1)
@ -34,7 +76,6 @@ func (w *Walker) ReadPathAndQueue(relPath string) error {
fullPath := filepath.Join(w.root, relPath)
names, err := readDirNames(fullPath)
if err != nil {
log.Errorf("Walker:ReadPathAndQueue:readDirNames - %s", err)
return err
}
@ -43,16 +84,17 @@ func (w *Walker) ReadPathAndQueue(relPath string) error {
subPath := filepath.Join(relPath, name)
info, err := w.lstat(subPath)
// Print the error rather than stopping our scan and propagating the error up.
// We do this because failures here alright and safe to ignore.
if err != nil {
if !errors.Is(err, file.RejectSymlinkErr) {
// Only print a warning if it's not a symlink error.
// We don't care about symlink-related errors and if we print them all we will overwhelm the console.
log.Warnf("Walker:ReadPathAndQueue - %s - %s", relPath, err)
}
continue
}
if info == nil {
log.Warnf("Walker:ReadPathAndQueue - %s - %s", relPath, err)
log.Warnf(`Walker:ReadPathAndQueue - lstat() was null for "%s"`, relPath)
continue
}
@ -61,9 +103,12 @@ func (w *Walker) ReadPathAndQueue(relPath string) error {
if errors.Is(err, filepath.SkipDir) {
return nil
}
if err != nil {
return err
}
// If this child is a directory, add it to the queue then move on.
if err == nil && info.Mode().IsDir() {
if info.Mode().IsDir() {
w.addJob(Job{
StartPath: subPath,
Walker: w,
@ -72,42 +117,3 @@ func (w *Walker) ReadPathAndQueue(relPath string) error {
}
return nil
}
// Walk recursively descends into subdirectories, calling the user-defined walkFn for each file or directory
// in the tree, starting with the root directory. It is only called one place: `queuedwalk()` in queuedwalk.go
func (w *Walker) walk(relPath string, walkFn filepath.WalkFunc) error {
// TODO: compare with filepath.WalkDir()
fullPath := filepath.Join(w.root, relPath)
info, err := w.lstat(relPath)
if err != nil {
return err
}
err = w.walkFunc(fullPath, info, err)
if errors.Is(err, filepath.SkipDir) {
return nil
}
if err != nil {
return err
}
if info == nil {
return fmt.Errorf("broken symlink: %s", relPath)
}
if !info.Mode().IsDir() {
return ErrNotDir
}
// Let the workers handle everything else.
w.addJob(Job{
StartPath: relPath,
Walker: w,
})
// Wait for the workers to finish reading the file system.
w.Wg.Wait()
return nil
}

View File

@ -15,8 +15,9 @@ import (
// readDirNames reads the directory named by dirname and returns
// a list of directory entries.
func readDirNames(dirname string) ([]string, error) {
f, err := os.Open(dirname)
func readDirNames(fullPath string) ([]string, error) {
file.RetardCheck(fullPath)
f, err := os.Open(fullPath)
if err != nil {
return nil, err
}
@ -34,9 +35,10 @@ func readDirNames(dirname string) ([]string, error) {
return names, nil
}
// lstat is a wrapper for os.Lstat which accepts a path relative to Walker.root and also rejects symlinks
// lstat is a wrapper for os.Lstat which accepts a path relative to Walker.root and also rejects symlinks.
func (w *Walker) lstat(relPath string) (info os.FileInfo, err error) {
fullPath := filepath.Join(w.root, relPath)
file.RetardCheck(fullPath)
info, err = os.Lstat(fullPath)
if err != nil {
return nil, err

View File

@ -18,7 +18,7 @@ func init() {
}
// ResponseItem is what is returned by the HTTP API as a JSON object.
// We don't return a `cacheitem.Item` because having a separate `responseitem`
// We don't return a `cacheitem.Item` because having a separate `ResponseItem`
// object allows us to customize the structure without messing with the original item.
type ResponseItem struct {
Path string `json:"path"`
@ -36,28 +36,6 @@ type ResponseItem struct {
}
func NewResponseItem(cacheItem *cacheitem.Item) *ResponseItem {
// TODO: this should never happen and can probably be removed.
// Problem was linked to the scenario where an item was not found in the cache
// so a new crawl was triggered but the `childItem` var was never updated.
//defer func() {
// if r := recover(); r != nil {
// copiedItem := &cacheitem.Item{
// Path: cacheitem.Path,
// Name: cacheitem.Name,
// Size: cacheitem.Size,
// Extension: cacheitem.Extension,
// Modified: cacheitem.Modified,
// Mode: cacheitem.Mode,
// IsDir: cacheitem.IsDir,
// IsSymlink: cacheitem.IsSymlink,
// Cached: cacheitem.Cached,
// Children: nil,
// MimeType: cacheitem.MimeType,
// }
// log.Fatalf("Recovered from panic: %s - %+v - %s", r, copiedItem, debug.Stack())
// }
//}()
newResponseItem := &ResponseItem{
Path: cacheItem.Path,
Name: cacheItem.Name,