track running crawls and add an admin page, use basic auth for admin, reject crawl if already running for a path, limit max directory crawlers, fix some issues

This commit is contained in:
Cyberes 2023-12-11 18:05:59 -07:00
parent a96708f6cf
commit 7078712bc3
21 changed files with 253 additions and 112 deletions

View File

@ -39,4 +39,4 @@ If you're using initial cache and have tons of files to scan you'll need at leas
so minutes for it to traverse the directory structure. CrazyFS is heavily threaded, so you'll want at least an 8-core so minutes for it to traverse the directory structure. CrazyFS is heavily threaded, so you'll want at least an 8-core
machine. machine.
CrazyFS works great with an HTTP cache in front of it. You'll need something line Nginx if you want SSL or HTTP. Also, CrazyFS works great with an HTTP cache in front of it.

View File

@ -1,8 +0,0 @@
The code you've posted is already quite efficient, but there are a few things you could consider to improve its performance:
1. **Use a more efficient file watcher:** The `github.com/radovskyb/watcher` package uses polling to detect file changes, which can be inefficient for large directories. If you're on Linux, consider using a package like `github.com/fsnotify/fsnotify` which uses inotify, a Linux kernel subsystem that provides more efficient file change notifications.
2. **Reduce the number of goroutines:** Each time a file change event is received, a new goroutine is created to handle it. This could potentially create a large number of goroutines if many file changes are happening at once. Consider using a worker pool pattern to limit the number of concurrent goroutines.
3. **Optimize your cache:** The LRU cache you're using is thread-safe, but it uses a mutex to achieve this. If you have a lot of contention (i.e., many goroutines trying to access the cache at once), this could slow things down. Consider using a sharded cache, which reduces contention by dividing the cache into several smaller caches, each with its own lock.
4. **Avoid unnecessary work:** If a file is created and then immediately modified, your code will crawl it twice. Consider adding a delay before crawling a file, and if another event for the same file is received during this delay, only crawl it once.
5. **Optimize your logging:** Writing to the log can be slow, especially if it's writing to a file or over the network. Consider using a buffered logger, which can improve performance by batching log messages together.
6. **Use a profiler:** The best way to find out where your code is spending its time is to use a profiler. The `net/http/pprof` package provides a simple way to add profiling endpoints to your application, which you can then view with the `go tool pprof` command.

View File

@ -10,12 +10,19 @@ import (
"time" "time"
) )
func NewItem(fullPath string, info os.FileInfo) *Item { func PathOutsideRoot(fullPath string) bool {
if !strings.HasPrefix(fullPath, config.GetConfig().RootDir) { return !strings.HasPrefix(fullPath, config.GetConfig().RootDir)
// Retard check }
panic(fmt.Sprintf("NewItem was not passed an absolute path. The path must start with the RootDir: %s", fullPath))
}
func RetardCheck(fullPath string) {
// Make sure we never do anything outside of the root dir.
if PathOutsideRoot(fullPath) {
panic(fmt.Sprintf("NewItem was not passed an absolute path. The path must start with the RootDir (%s). Failing path: %s", config.GetConfig().RootDir, fullPath))
}
}
func NewItem(fullPath string, info os.FileInfo) *Item {
RetardCheck(fullPath)
if config.GetConfig().CachePrintNew { if config.GetConfig().CachePrintNew {
log.Debugf("CACHE - new: %s", fullPath) log.Debugf("CACHE - new: %s", fullPath)
} }

View File

@ -60,7 +60,6 @@ func NewResponseItem(cacheItem *CacheItem.Item, sharedCache *lru.Cache[string, *
dc := DirectoryCrawler.NewDirectoryCrawler(sharedCache) dc := DirectoryCrawler.NewDirectoryCrawler(sharedCache)
item, err := dc.CrawlNoRecursion(filepath.Join(config.GetConfig().RootDir, child)) item, err := dc.CrawlNoRecursion(filepath.Join(config.GetConfig().RootDir, child))
if err != nil { if err != nil {
log.Errorf("NewResponseItem - CrawlNoRecursion - %s", err) log.Errorf("NewResponseItem - CrawlNoRecursion - %s", err)
continue // skip this child continue // skip this child

View File

@ -6,34 +6,47 @@ import (
"crazyfs/cache/DirectoryCrawler" "crazyfs/cache/DirectoryCrawler"
"crazyfs/config" "crazyfs/config"
"crazyfs/elastic" "crazyfs/elastic"
"crypto/sha256"
"crypto/subtle"
"encoding/json" "encoding/json"
lru "github.com/hashicorp/golang-lru/v2" lru "github.com/hashicorp/golang-lru/v2"
"net/http" "net/http"
) )
func AdminCacheInfo(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache[string, *CacheItem.Item]) { func AdminCacheInfo(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache[string, *CacheItem.Item]) {
auth := r.URL.Query().Get("auth") username, password, ok := r.BasicAuth()
if auth == "" || auth != config.GetConfig().HttpAdminKey { if ok {
helpers.Return403Msg("access denied", w) usernameHash := sha256.Sum256([]byte(username))
return passwordHash := sha256.Sum256([]byte(password))
} expectedUsernameHash := sha256.Sum256([]byte("admin"))
expectedPasswordHash := sha256.Sum256([]byte(config.GetConfig().HttpAdminKey))
usernameMatch := subtle.ConstantTimeCompare(usernameHash[:], expectedUsernameHash[:]) == 1
passwordMatch := subtle.ConstantTimeCompare(passwordHash[:], expectedPasswordHash[:]) == 1
cacheLen := sharedCache.Len() if !usernameMatch || !passwordMatch {
helpers.Return401Msg("unauthorized", w)
return
} else {
cacheLen := sharedCache.Len()
response := map[string]interface{}{ response := map[string]interface{}{
"cache_size": cacheLen, "cache_size": cacheLen,
"cache_max": config.GetConfig().CacheSize, "cache_max": config.GetConfig().CacheSize,
"crawls_running": DirectoryCrawler.GetGlobalActiveCrawls(), "crawls_running": DirectoryCrawler.GetTotalActiveCrawls(),
"active_workers": DirectoryCrawler.ActiveWorkers, "busy_workers": DirectoryCrawler.BusyWorkers,
"busy_workers": DirectoryCrawler.ActiveWalks, "new_sync_running": elastic.ElasticRefreshSyncRunning,
"new_sync_running": elastic.ElasticRefreshSyncRunning, "refresh_sync_running": elastic.ElasticRefreshSyncRunning,
"refresh_sync_running": elastic.ElasticRefreshSyncRunning, }
} w.Header().Set("Cache-Control", "no-store")
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Content-Type", "application/json") err := json.NewEncoder(w).Encode(response)
err := json.NewEncoder(w).Encode(response) if err != nil {
if err != nil { log.Errorf("AdminCacheInfo - Failed to serialize JSON: %s", err)
log.Errorf("AdminCacheInfo - Failed to serialize JSON: %s", err) return
return }
return
}
} }
w.Header().Set("WWW-Authenticate", `Basic realm="restricted", charset="UTF-8"`)
helpers.Return401Msg("unauthorized", w)
} }

View File

@ -0,0 +1,46 @@
package api
import (
"crazyfs/CacheItem"
"crazyfs/api/helpers"
"crazyfs/cache/DirectoryCrawler"
"crazyfs/config"
"crypto/sha256"
"crypto/subtle"
"encoding/json"
lru "github.com/hashicorp/golang-lru/v2"
"net/http"
)
func AdminCrawlsInfo(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache[string, *CacheItem.Item]) {
username, password, ok := r.BasicAuth()
if ok {
usernameHash := sha256.Sum256([]byte(username))
passwordHash := sha256.Sum256([]byte(password))
expectedUsernameHash := sha256.Sum256([]byte("admin"))
expectedPasswordHash := sha256.Sum256([]byte(config.GetConfig().HttpAdminKey))
usernameMatch := subtle.ConstantTimeCompare(usernameHash[:], expectedUsernameHash[:]) == 1
passwordMatch := subtle.ConstantTimeCompare(passwordHash[:], expectedPasswordHash[:]) == 1
if !usernameMatch || !passwordMatch {
helpers.Return401Msg("unauthorized", w)
return
} else {
response := map[string]interface{}{
"active": DirectoryCrawler.GetActiveCrawls(),
"finished": DirectoryCrawler.GetFinishedCrawls(),
}
w.Header().Set("Cache-Control", "no-store")
w.Header().Set("Content-Type", "application/json")
err := json.NewEncoder(w).Encode(response)
if err != nil {
log.Errorf("AdminCrawlsInfo - Failed to serialize JSON: %s", err)
helpers.Return500Msg(w)
return
}
return
}
}
w.Header().Set("WWW-Authenticate", `Basic realm="restricted", charset="UTF-8"`)
helpers.Return401Msg("unauthorized", w)
}

View File

@ -7,6 +7,7 @@ import (
"crazyfs/config" "crazyfs/config"
"crazyfs/file" "crazyfs/file"
"encoding/json" "encoding/json"
"fmt"
lru "github.com/hashicorp/golang-lru/v2" lru "github.com/hashicorp/golang-lru/v2"
"net/http" "net/http"
) )
@ -27,7 +28,7 @@ func AdminReCache(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache
auth := requestBody["auth"] auth := requestBody["auth"]
if auth == "" || auth != config.GetConfig().HttpAdminKey { if auth == "" || auth != config.GetConfig().HttpAdminKey {
helpers.Return403Msg("access denied", w) helpers.Return401Msg("unauthorized", w)
return return
} }
@ -42,16 +43,21 @@ func AdminReCache(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache
return return
} }
//relPath := cache.StripRootDir(fullPath, cfg.RootDir)
// Check and re-cache the directory
cache.Recache(fullPath, sharedCache)
response := map[string]interface{}{ response := map[string]interface{}{
"message": "Re-cache triggered for directory: " + fullPath, "message": "Re-cache triggered for directory: " + fullPath,
"error": nil,
} }
log.Infof("Admin triggered recache for %s", fullPath) // Check and re-cache the directory
err = cache.Recache(fullPath, sharedCache)
if err != nil {
response["message"] = fmt.Sprintf("recache failed")
response["error"] = err.Error()
w.WriteHeader(http.StatusConflict)
log.Errorf("Admin triggered recache for %s - %s", fullPath, err)
} else {
log.Infof("Admin triggered recache for %s", fullPath)
}
w.Header().Set("Cache-Control", "no-store")
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
err = json.NewEncoder(w).Encode(response) err = json.NewEncoder(w).Encode(response)
if err != nil { if err != nil {

View File

@ -16,7 +16,7 @@ func HealthCheck(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache[
response := map[string]interface{}{} response := map[string]interface{}{}
response["scan_running"] = DirectoryCrawler.GetGlobalActiveCrawls() > 0 response["scan_running"] = DirectoryCrawler.GetTotalActiveCrawls() > 0
response["initial_scan_running"] = cache.InitialCrawlInProgress response["initial_scan_running"] = cache.InitialCrawlInProgress
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")

View File

@ -71,7 +71,7 @@ func SearchFile(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache[s
if config.GetConfig().ElasticsearchEnable { if config.GetConfig().ElasticsearchEnable {
// Perform the Elasticsearch query // Perform the Elasticsearch query
resp, err := elastic.Search(queryString, excludeElements) resp, err := elastic.SimpleQuery(queryString, excludeElements)
if err != nil { if err != nil {
log.Errorf(`SEARCH - Failed to perform Elasticsearch query "%s" - %s`, queryString, err) log.Errorf(`SEARCH - Failed to perform Elasticsearch query "%s" - %s`, queryString, err)
helpers.Return500Msg(w) helpers.Return500Msg(w)

View File

@ -14,7 +14,7 @@ import (
func ClientHealthCheck(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache[string, *CacheItem.Item]) { func ClientHealthCheck(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache[string, *CacheItem.Item]) {
response := map[string]interface{}{} response := map[string]interface{}{}
response["scan_running"] = DirectoryCrawler.GetGlobalActiveCrawls() > 0 response["scan_running"] = DirectoryCrawler.GetTotalActiveCrawls() > 0
response["initial_scan_running"] = cache.InitialCrawlInProgress response["initial_scan_running"] = cache.InitialCrawlInProgress
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")

View File

@ -1,15 +1,11 @@
package helpers package helpers
import ( import (
"crazyfs/logging"
"encoding/json" "encoding/json"
"net/http" "net/http"
) )
func WriteErrorResponse(jsonCode, httpCode int, msg string, w http.ResponseWriter) { func WriteErrorResponse(jsonCode, httpCode int, msg string, w http.ResponseWriter) {
log := logging.GetLogger()
log.Warnln(msg)
w.Header().Set("Cache-Control", "no-store") w.Header().Set("Cache-Control", "no-store")
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
w.WriteHeader(httpCode) w.WriteHeader(httpCode)
@ -25,7 +21,6 @@ func WriteErrorResponse(jsonCode, httpCode int, msg string, w http.ResponseWrite
} }
func ReturnFake404Msg(msg string, w http.ResponseWriter) { func ReturnFake404Msg(msg string, w http.ResponseWriter) {
log.Fatalf(msg)
WriteErrorResponse(404, http.StatusBadRequest, msg, w) WriteErrorResponse(404, http.StatusBadRequest, msg, w)
} }
@ -44,3 +39,7 @@ func Return500Msg(w http.ResponseWriter) {
func Return403Msg(msg string, w http.ResponseWriter) { func Return403Msg(msg string, w http.ResponseWriter) {
WriteErrorResponse(http.StatusForbidden, http.StatusForbidden, msg, w) WriteErrorResponse(http.StatusForbidden, http.StatusForbidden, msg, w)
} }
func Return401Msg(msg string, w http.ResponseWriter) {
WriteErrorResponse(http.StatusUnauthorized, http.StatusUnauthorized, msg, w)
}

View File

@ -47,7 +47,7 @@ func HandleFileNotFound(relPath string, fullPath string, sharedCache *lru.Cache[
// Start a blocking non-recursive crawl. // Start a blocking non-recursive crawl.
item, err := dc.CrawlNoRecursion(fullPath) item, err := dc.CrawlNoRecursion(fullPath)
if os.IsNotExist(err) || item == nil { if err == nil && (os.IsNotExist(err) || item == nil) {
ReturnFake404Msg("path not found", w) ReturnFake404Msg("path not found", w)
return nil return nil
} else if err != nil { } else if err != nil {

View File

@ -65,6 +65,12 @@ var routes = Routes{
"/api/admin/cache/recache", "/api/admin/cache/recache",
wrongMethod("POST", AdminReCache), wrongMethod("POST", AdminReCache),
}, },
Route{
"Crawls Info",
"GET",
"/api/admin/crawls/info",
AdminCrawlsInfo,
},
Route{ Route{
"Server Health", "Server Health",
"GET", "GET",

View File

@ -8,10 +8,27 @@ import (
"path/filepath" "path/filepath"
"strings" "strings"
"sync" "sync"
"sync/atomic" "time"
) )
var globalActiveCrawls int32 const maxFinishedCrawls = 100
var activeCrawls = make(map[string]*ActiveCrawl)
var finishedCrawls = make([]FinishedCrawl, 0, maxFinishedCrawls)
var activeCrawlsMutex = &sync.Mutex{}
var finishedCrawlsMutex = &sync.Mutex{}
type ActiveCrawl struct {
Path string `json:"path"`
Start int64 `json:"start"`
Elapsed int64 `json:"elapsed"`
}
type FinishedCrawl struct {
Path string `json:"path"`
Start int64 `json:"start"`
Elapsed int64 `json:"elapsed"`
}
type DirectoryCrawler struct { type DirectoryCrawler struct {
cache *lru.Cache[string, *CacheItem.Item] cache *lru.Cache[string, *CacheItem.Item]
@ -76,14 +93,49 @@ func isSubpath(path, subpath string) bool {
return true return true
} }
func (dc *DirectoryCrawler) incrementGlobalActiveCrawls() { func (dc *DirectoryCrawler) startCrawl(path string) bool {
atomic.AddInt32(&globalActiveCrawls, 1) if dc.IsCrawlActive(path) {
return false
}
activeCrawls[path] = &ActiveCrawl{Path: path, Start: time.Now().Unix(), Elapsed: int64(0)}
return true
} }
func (dc *DirectoryCrawler) decrementGlobalActiveCrawls() { func (dc *DirectoryCrawler) endCrawl(path string) {
atomic.AddInt32(&globalActiveCrawls, -1) activeCrawlsMutex.Lock()
finishedCrawlsMutex.Lock()
defer activeCrawlsMutex.Unlock()
defer finishedCrawlsMutex.Unlock()
if len(finishedCrawls) >= maxFinishedCrawls {
finishedCrawls = finishedCrawls[1:]
}
finishedCrawls = append(finishedCrawls, FinishedCrawl{Path: path, Start: activeCrawls[path].Start, Elapsed: int64(time.Since(time.Unix(activeCrawls[path].Start, 0)).Seconds())})
delete(activeCrawls, path)
} }
func GetGlobalActiveCrawls() int32 { func (dc *DirectoryCrawler) IsCrawlActive(path string) bool {
return atomic.LoadInt32(&globalActiveCrawls) activeCrawlsMutex.Lock()
defer activeCrawlsMutex.Unlock()
_, active := activeCrawls[path]
return active
}
func GetActiveCrawls() map[string]*ActiveCrawl {
activeCrawlsMutex.Lock()
defer activeCrawlsMutex.Unlock()
for path := range activeCrawls {
a := activeCrawls[path]
a.Elapsed = int64(time.Since(time.Unix(a.Start, 0)).Seconds())
}
return activeCrawls
}
func GetFinishedCrawls() []FinishedCrawl {
finishedCrawlsMutex.Lock()
defer finishedCrawlsMutex.Unlock()
return finishedCrawls
}
func GetTotalActiveCrawls() int {
return len(activeCrawls)
} }

View File

@ -1,6 +1,7 @@
package DirectoryCrawler package DirectoryCrawler
import ( import (
"crazyfs/config"
"errors" "errors"
"fmt" "fmt"
"os" "os"
@ -9,16 +10,11 @@ import (
"sync/atomic" "sync/atomic"
) )
var JobQueueSize int
// WorkerPool is a buffered channel acting as a semaphore to limit the number of active workers globally // WorkerPool is a buffered channel acting as a semaphore to limit the number of active workers globally
var WorkerPool chan struct{} var WorkerPool chan struct{}
// ActiveWorkers is an atomic counter for the number of active workers // BusyWorkers is an atomic counter for the number of active workers
var ActiveWorkers int32 var BusyWorkers int32
// ActiveWalks is an atomic counter for the number of active Walk crawls
var ActiveWalks int32
// ErrNotDir indicates that the path, which is being passed // ErrNotDir indicates that the path, which is being passed
// to a walker function, does not point to a directory // to a walker function, does not point to a directory
@ -109,11 +105,10 @@ func (w *Walker) processPath(relpath string) error {
continue continue
} }
w.walkFunc(filepath.Join(w.root, subpath), info, err) err = w.walkFunc(filepath.Join(w.root, subpath), info, err)
if errors.Is(err, filepath.SkipDir) {
//if err == filepath.SkipDir { return nil
// return nil }
//}
if info.Mode().IsDir() { if info.Mode().IsDir() {
w.addJob(subpath) w.addJob(subpath)
@ -138,36 +133,31 @@ func (w *Walker) addJob(path string) {
} }
} }
// worker processes all the jobs // worker processes all the jobs until the jobs channel is explicitly closed
// until the jobs channel is explicitly closed
func (w *Walker) worker() { func (w *Walker) worker() {
for path := range w.jobs { for path := range w.jobs {
WorkerPool <- struct{}{} // acquire a worker WorkerPool <- struct{}{} // acquire a worker
atomic.AddInt32(&ActiveWorkers, 1) // increment the number of active workers atomic.AddInt32(&BusyWorkers, 1) // increment the number of active workers
err := w.processPath(path) err := w.processPath(path)
if err != nil { if err != nil {
log.Warnf("worker - %s", err) log.Warnf("worker - %s", err)
} }
<-WorkerPool // release the worker when done <-WorkerPool // release the worker when done
atomic.AddInt32(&ActiveWorkers, -1) // decrement the number of active workers atomic.AddInt32(&BusyWorkers, -1) // decrement the number of active workers
} }
} }
// Walk recursively descends into subdirectories, // Walk recursively descends into subdirectories, calling walkFn for each file or directory
// calling walkFn for each file or directory
// in the tree, including the root directory. // in the tree, including the root directory.
func (w *Walker) Walk(relpath string, walkFn filepath.WalkFunc) error { func (w *Walker) Walk(relpath string, walkFn filepath.WalkFunc) error {
atomic.AddInt32(&ActiveWalks, 1) // increment the number of active Walk crawls w.jobs = make(chan string, config.GetConfig().DirectoryCrawlers)
defer atomic.AddInt32(&ActiveWalks, -1) // decrement the number of active Walk crawls when done
w.jobs = make(chan string, JobQueueSize)
w.walkFunc = walkFn w.walkFunc = walkFn
info, err := w.lstat(relpath) info, err := w.lstat(relpath)
err = w.walkFunc(filepath.Join(w.root, relpath), info, err) err = w.walkFunc(filepath.Join(w.root, relpath), info, err)
if err == filepath.SkipDir { if errors.Is(err, filepath.SkipDir) {
return nil return nil
} }
if err != nil { if err != nil {
@ -182,8 +172,8 @@ func (w *Walker) Walk(relpath string, walkFn filepath.WalkFunc) error {
return ErrNotDir return ErrNotDir
} }
// spawn workers // Spawn workers
for n := 1; n <= JobQueueSize; n++ { for n := 1; n <= config.GetConfig().DirectoryCrawlers; n++ {
go w.worker() go w.worker()
} }

View File

@ -4,34 +4,45 @@ import (
"crazyfs/CacheItem" "crazyfs/CacheItem"
"crazyfs/config" "crazyfs/config"
"crazyfs/file" "crazyfs/file"
"errors"
"fmt"
"os" "os"
"path/filepath" "path/filepath"
) )
func (dc *DirectoryCrawler) walkRecursiveFunc(path string, info os.FileInfo, err error) error { func (dc *DirectoryCrawler) walkRecursiveFunc(fullPath string, info os.FileInfo, err error) error {
processErr := dc.processPath(path, info) CacheItem.RetardCheck(fullPath)
processErr := dc.processPath(fullPath, info)
if processErr != nil { if processErr != nil {
log.Errorf("CRAWLER - walkRecursiveFunc() failed - %s - %s", processErr, path) log.Errorf("CRAWLER - walkRecursiveFunc() failed - %s - %s", processErr, fullPath)
return processErr return processErr
} }
return nil return nil
} }
func (dc *DirectoryCrawler) walkNonRecursiveFunc(path string, dir os.DirEntry, err error) error { func (dc *DirectoryCrawler) walkNonRecursiveFunc(fullPath string, dir os.DirEntry, err error) error {
CacheItem.RetardCheck(fullPath)
info, infoErr := dir.Info() info, infoErr := dir.Info()
if infoErr != nil { if infoErr != nil {
log.Errorf("CRAWLER - walkNonRecursiveFunc() - get info failed - %s - %s", infoErr, path) log.Errorf("CRAWLER - walkNonRecursiveFunc() - get info failed - %s - %s", infoErr, fullPath)
return infoErr return infoErr
} }
processErr := dc.processPath(path, info) processErr := dc.processPath(fullPath, info)
if processErr != nil { if processErr != nil {
log.Errorf("CRAWLER - walkNonRecursiveFunc() failed - %s - %s", processErr, path) log.Errorf("CRAWLER - walkNonRecursiveFunc() failed - %s - %s", processErr, fullPath)
return processErr return processErr
} }
return nil return nil
} }
func (dc *DirectoryCrawler) Crawl(fullPath string) error { func (dc *DirectoryCrawler) Crawl(fullPath string) error {
CacheItem.RetardCheck(fullPath)
readyToStart := dc.startCrawl(fullPath)
if !readyToStart {
return errors.New(fmt.Sprintf(`rejecting crawl, already in progress for "%s"`, fullPath))
}
defer dc.endCrawl(fullPath)
info, err := os.Lstat(fullPath) info, err := os.Lstat(fullPath)
if os.IsNotExist(err) { if os.IsNotExist(err) {
// If the path doesn't exist, just silently exit // If the path doesn't exist, just silently exit
@ -84,6 +95,13 @@ func (dc *DirectoryCrawler) Crawl(fullPath string) error {
// CrawlNoRecursion this function crawls a file or directory and does not recurse into any subdirectories. Also returns the result of the crawl. // CrawlNoRecursion this function crawls a file or directory and does not recurse into any subdirectories. Also returns the result of the crawl.
func (dc *DirectoryCrawler) CrawlNoRecursion(fullPath string) (*CacheItem.Item, error) { func (dc *DirectoryCrawler) CrawlNoRecursion(fullPath string) (*CacheItem.Item, error) {
CacheItem.RetardCheck(fullPath)
readyToStart := dc.startCrawl(fullPath)
if !readyToStart {
return nil, errors.New(fmt.Sprintf(`rejecting crawl, already in progress for "%s"`, fullPath))
}
defer dc.endCrawl(fullPath)
info, err := os.Lstat(fullPath) info, err := os.Lstat(fullPath)
if os.IsNotExist(err) { if os.IsNotExist(err) {
// If the path doesn't exist, just silently exit // If the path doesn't exist, just silently exit

View File

@ -44,10 +44,10 @@ func startCrawl(sharedCache *lru.Cache[string, *CacheItem.Item], wg *sync.WaitGr
log.Infoln("CRAWLER - Starting a crawl...") log.Infoln("CRAWLER - Starting a crawl...")
start := time.Now() start := time.Now()
err := dc.Crawl(config.GetConfig().RootDir) err := dc.Crawl(config.GetConfig().RootDir)
duration := time.Since(start).Round(time.Second)
if err != nil { if err != nil {
log.Warnf("CRAWLER - Crawl failed: %s", err) log.Warnf("CRAWLER - Crawl failed: %s", err)
} else { } else {
duration := time.Since(start).Round(time.Second)
log.Infof("CRAWLER - Crawl completed in %s", duration) log.Infof("CRAWLER - Crawl completed in %s", duration)
log.Debugf("%d/%d items in the cache.", config.GetConfig().CacheSize, len(sharedCache.Keys())) log.Debugf("%d/%d items in the cache.", config.GetConfig().CacheSize, len(sharedCache.Keys()))
} }
@ -59,8 +59,8 @@ func startCrawl(sharedCache *lru.Cache[string, *CacheItem.Item], wg *sync.WaitGr
func logCacheStatus(msg string, ticker *time.Ticker, sharedCache *lru.Cache[string, *CacheItem.Item], logFn func(format string, args ...interface{})) { func logCacheStatus(msg string, ticker *time.Ticker, sharedCache *lru.Cache[string, *CacheItem.Item], logFn func(format string, args ...interface{})) {
defer ticker.Stop() defer ticker.Stop()
for range ticker.C { for range ticker.C {
activeWorkers := int(DirectoryCrawler.ActiveWorkers) activeWorkers := int(DirectoryCrawler.BusyWorkers)
busyWorkers := int(DirectoryCrawler.ActiveWalks) runningCrawls := DirectoryCrawler.GetTotalActiveCrawls()
logFn("%s - %d/%d items in the cache. Active workers: %d Active crawls: %d", msg, len(sharedCache.Keys()), config.GetConfig().CacheSize, activeWorkers, busyWorkers) logFn("%s - %d/%d items in the cache. Active workers: %d Active crawls: %d", msg, len(sharedCache.Keys()), config.GetConfig().CacheSize, activeWorkers, runningCrawls)
} }
} }

24
src/cache/recache.go vendored
View File

@ -5,7 +5,7 @@ import (
"crazyfs/cache/DirectoryCrawler" "crazyfs/cache/DirectoryCrawler"
"crazyfs/config" "crazyfs/config"
"crazyfs/file" "crazyfs/file"
"crazyfs/logging" "errors"
lru "github.com/hashicorp/golang-lru/v2" lru "github.com/hashicorp/golang-lru/v2"
"os" "os"
"path/filepath" "path/filepath"
@ -21,7 +21,6 @@ func InitRecacheSemaphore(limit int) {
func CheckAndRecache(path string, sharedCache *lru.Cache[string, *CacheItem.Item]) { func CheckAndRecache(path string, sharedCache *lru.Cache[string, *CacheItem.Item]) {
item, found := sharedCache.Get(path) item, found := sharedCache.Get(path)
if found && time.Now().UnixNano()/int64(time.Millisecond)-item.Cached > int64(config.GetConfig().CacheTime)*60*1000 { if found && time.Now().UnixNano()/int64(time.Millisecond)-item.Cached > int64(config.GetConfig().CacheTime)*60*1000 {
log := logging.GetLogger()
log.Debugf("Re-caching: %s", path) log.Debugf("Re-caching: %s", path)
sem <- struct{}{} // acquire a token sem <- struct{}{} // acquire a token
go func() { go func() {
@ -30,13 +29,18 @@ func CheckAndRecache(path string, sharedCache *lru.Cache[string, *CacheItem.Item
err := dc.Crawl(path) err := dc.Crawl(path)
if err != nil { if err != nil {
log.Errorf("RECACHE ERROR: %s", err.Error()) log.Errorf("RECACHE ERROR: %s", err.Error())
return
} }
}() }()
} }
} }
func Recache(path string, sharedCache *lru.Cache[string, *CacheItem.Item]) { func Recache(path string, sharedCache *lru.Cache[string, *CacheItem.Item]) error {
log := logging.GetLogger() dc := DirectoryCrawler.NewDirectoryCrawler(sharedCache)
if dc.IsCrawlActive(path) {
return errors.New("rejecting crawl, already in progress for this path")
}
log.Debugf("Re-caching: %s", path) log.Debugf("Re-caching: %s", path)
start := time.Now() start := time.Now()
sem <- struct{}{} // acquire a token sem <- struct{}{} // acquire a token
@ -46,6 +50,7 @@ func Recache(path string, sharedCache *lru.Cache[string, *CacheItem.Item]) {
err := dc.Crawl(path) err := dc.Crawl(path)
if err != nil { if err != nil {
log.Errorf("RECACHE ERROR: %s", err.Error()) log.Errorf("RECACHE ERROR: %s", err.Error())
return
} }
// Get the parent directory from the cache // Get the parent directory from the cache
@ -53,7 +58,7 @@ func Recache(path string, sharedCache *lru.Cache[string, *CacheItem.Item]) {
parentDirRel := file.StripRootDir(parentDir) parentDirRel := file.StripRootDir(parentDir)
parentItem, found := sharedCache.Get(parentDirRel) parentItem, found := sharedCache.Get(parentDirRel)
if found { if found {
// Remove the old sub-directory from the parent directory's Children field // Remove the old subdirectory from the parent directory's Children field
for i, child := range parentItem.Children { for i, child := range parentItem.Children {
if child == path { if child == path {
parentItem.Children = append(parentItem.Children[:i], parentItem.Children[i+1:]...) parentItem.Children = append(parentItem.Children[:i], parentItem.Children[i+1:]...)
@ -61,10 +66,11 @@ func Recache(path string, sharedCache *lru.Cache[string, *CacheItem.Item]) {
} }
} }
// Update the parent directory's Children field to include the new sub-directory // Update the parent directory's Children field to include the new subdirectory
info, err := os.Stat(path) info, err := os.Stat(path)
if err != nil { if err != nil {
log.Errorf("RECACHE ERROR: %s", err.Error()) log.Errorf("RECACHE ERROR: %s", err.Error())
return
} else { } else {
newItem := CacheItem.NewItem(path, info) newItem := CacheItem.NewItem(path, info)
// Create a new slice that contains all items from the Children field except the old directory // Create a new slice that contains all items from the Children field except the old directory
@ -81,15 +87,17 @@ func Recache(path string, sharedCache *lru.Cache[string, *CacheItem.Item]) {
// Update the parent directory in the cache // Update the parent directory in the cache
sharedCache.Add(parentDir, parentItem) sharedCache.Add(parentDir, parentItem)
} }
} else { } else if !CacheItem.PathOutsideRoot(parentDir) {
// If the parent directory isn't in the cache, crawl it // If the parent directory isn't in the cache, crawl it
log.Infof("RECACHE - crawling parent directory since it isn't in the cache yet: %s", parentDir) log.Infof("RECACHE - crawling parent directory since it isn't in the cache yet: %s", parentDir)
err := dc.Crawl(parentDir) _, err := dc.CrawlNoRecursion(parentDir)
if err != nil { if err != nil {
log.Errorf("RECACHE ERROR: %s", err.Error()) log.Errorf("RECACHE ERROR: %s", err.Error())
return
} }
} }
duration := time.Since(start).Round(time.Second) duration := time.Since(start).Round(time.Second)
log.Infof("RECACHE - completed in %s - %s", duration, path) log.Infof("RECACHE - completed in %s - %s", duration, path)
}() }()
return nil
} }

View File

@ -39,6 +39,14 @@ type cliConfig struct {
// TODO: admin api endpoint to get status and progress of the full refresh of elasticsearch // TODO: admin api endpoint to get status and progress of the full refresh of elasticsearch
func main() { func main() {
//fullPath := "/srv/chub-archive"
//RootDir := "/srv/chub-archive"
//
//fmt.Println(strings.HasPrefix(fullPath, RootDir))
////fmt.Println(fullPath != RootDir)
//
//return
cliArgs := parseArgs() cliArgs := parseArgs()
if cliArgs.help { if cliArgs.help {
flag.Usage() flag.Usage()

View File

@ -10,7 +10,7 @@ import (
"strings" "strings"
) )
func Search(query string, exclude []string) (*esapi.Response, error) { func SimpleQuery(query string, exclude []string) (*esapi.Response, error) {
var excludeQuery string var excludeQuery string
if len(exclude) > 0 { if len(exclude) > 0 {
var excludeConditions []string var excludeConditions []string

View File

@ -1,13 +1,10 @@
- Track active crawls and list them on the admin page
- Limit to one on-demand crawl per path. Don't start another if one is already running. See HandleFileNotFound() - Limit to one on-demand crawl per path. Don't start another if one is already running. See HandleFileNotFound()
- Add config value to limit the number of on-demand crawls - Add config value to limit the number of on-demand crawls
- Add config value to limit the number of concurrent crawls, other crawls get queued. - Add config value to limit the number of concurrent crawls, other crawls get queued.
- add an admin endpoint to fetch the last n modified files.
- fix /api/file/download when an item is in the cache but does not exist on the disk
- Is using scroll for the Elastic query really the best way to do a real-time query?
Later: Later:
- Add a wildcard option to restricted_download_paths to block all sub-directories - Add a wildcard option to restricted_download_paths to block all sub-directories
- Add a dict to each restricted_download_paths item to specify how many levels recursive the block should be applied - Add a dict to each restricted_download_paths item to specify how many levels recursive the block should be applied
- add a "last modified" to "sort" https://chub-archive.evulid.cc/api/file/list?path=/chub.ai/characters&page=1&limit=50&sort=folders - add a "last modified" to "sort" https://chub-archive.evulid.cc/api/file/list?path=/chub.ai/characters&page=1&limit=50&sort=folders
- add an admin endpoint to fetch the last n modified files.