rename all modules to lowercase

This commit is contained in:
Cyberes 2024-03-17 12:22:02 -06:00
parent 8a21665e83
commit 92e55ca319
38 changed files with 183 additions and 186 deletions

2
.gitignore vendored
View File

@ -2,7 +2,7 @@
config.yml config.yml
config.yaml config.yaml
build/ build/
src/crazyfs dist/
# ---> Go # ---> Go
# If you prefer the WHITElist template instead of the BLACKlist, see community template: # If you prefer the WHITElist template instead of the BLACKlist, see community template:

View File

@ -1,21 +1,21 @@
package helpers package helpers
import ( import (
"crazyfs/CacheItem" "crazyfs/cacheitem"
"crazyfs/DirectoryCrawler" "crazyfs/directorycrawler"
"crazyfs/SharedCache"
"crazyfs/globals" "crazyfs/globals"
"crazyfs/sharedcache"
"net/http" "net/http"
"os" "os"
"time" "time"
) )
// HandleFileNotFound if the data is not in the cache, start a new crawler // HandleFileNotFound if the data is not in the cache, start a new crawler
func HandleFileNotFound(relPath string, fullPath string, w http.ResponseWriter) *CacheItem.Item { func HandleFileNotFound(relPath string, fullPath string, w http.ResponseWriter) *cacheitem.Item {
// TODO: implement some sort of backoff or delay for repeated calls to recache the same path. // TODO: implement some sort of backoff or delay for repeated calls to recache the same path.
log.Debugf(`HELPERS:HandleFileNotFound:Crawl - Not in cache, crawling: "%s"`, fullPath) log.Debugf(`HELPERS:HandleFileNotFound:Crawl - Not in cache, crawling: "%s"`, fullPath)
dc := DirectoryCrawler.NewDirectoryCrawler(globals.DirectoryCrawlers.Queue) dc := directorycrawler.NewDirectoryCrawler(globals.DirectoryCrawlers.Queue)
// Check if this is a symlink. We do this before CrawlNoRecursion() because we want to tell the end user that // Check if this is a symlink. We do this before CrawlNoRecursion() because we want to tell the end user that
// we're not going to resolve this symlink. // we're not going to resolve this symlink.
@ -47,7 +47,7 @@ func HandleFileNotFound(relPath string, fullPath string, w http.ResponseWriter)
} }
// Try to get the data from the cache again. // Try to get the data from the cache again.
item, found := SharedCache.Cache.Get(relPath) item, found := sharedcache.Cache.Get(relPath)
if !found { if !found {
// TODO: let's not re-check the disk if the file is still not in the cache. Instead, let's just assume that it doesn't exist. // TODO: let's not re-check the disk if the file is still not in the cache. Instead, let's just assume that it doesn't exist.
ReturnFake404Msg("path not found", w) ReturnFake404Msg("path not found", w)
@ -68,7 +68,7 @@ func HandleFileNotFound(relPath string, fullPath string, w http.ResponseWriter)
//} //}
} }
// If CacheItem is still nil, error // If cacheitem is still nil, error
if item == nil { if item == nil {
log.Errorf("HELPERS:HandleFileNotFound:Crawl - Failed to find %s and did not return a 404", relPath) log.Errorf("HELPERS:HandleFileNotFound:Crawl - Failed to find %s and did not return a 404", relPath)
Return500Msg(w) Return500Msg(w)
@ -84,7 +84,7 @@ func HandleFileNotFound(relPath string, fullPath string, w http.ResponseWriter)
// Start a recursive crawl in the background. // Start a recursive crawl in the background.
go func() { go func() {
log.Debugf("HELPERS:HandleFileNotFound:Crawl - Starting background recursive crawl for %s", fullPath) log.Debugf("HELPERS:HandleFileNotFound:Crawl - Starting background recursive crawl for %s", fullPath)
dc := DirectoryCrawler.NewDirectoryCrawler(globals.DirectoryCrawlers.Queue) dc := directorycrawler.NewDirectoryCrawler(globals.DirectoryCrawlers.Queue)
start := time.Now() start := time.Now()
err := dc.Crawl(fullPath, nil) err := dc.Crawl(fullPath, nil)
if err != nil { if err != nil {

View File

@ -1,8 +1,8 @@
package helpers package helpers
import ( import (
"crazyfs/SharedCache"
"crazyfs/file" "crazyfs/file"
"crazyfs/sharedcache"
kzip "github.com/klauspost/compress/zip" kzip "github.com/klauspost/compress/zip"
"io" "io"
"net/http" "net/http"
@ -15,7 +15,7 @@ func ZipHandlerCompress(dirPath string, w http.ResponseWriter, r *http.Request)
//w.WriteHeader(http.StatusOK) //w.WriteHeader(http.StatusOK)
zipWriter := kzip.NewWriter(w) zipWriter := kzip.NewWriter(w)
// Walk through the directory and add each file to the zip // queuedwalk through the directory and add each file to the zip
filepath.Walk(dirPath, func(filePath string, info os.FileInfo, err error) error { filepath.Walk(dirPath, func(filePath string, info os.FileInfo, err error) error {
if info.IsDir() { if info.IsDir() {
return nil return nil
@ -49,12 +49,12 @@ func ZipHandlerCompress(dirPath string, w http.ResponseWriter, r *http.Request)
} }
func ZipHandlerCompressMultiple(paths []string, w http.ResponseWriter, r *http.Request) { func ZipHandlerCompressMultiple(paths []string, w http.ResponseWriter, r *http.Request) {
zipWriter := kzip.NewWriter(w) zipWriter := kzip.NewWriter(w)
// Walk through each file and add it to the zip // queuedwalk through each file and add it to the zip
for _, fullPath := range paths { for _, fullPath := range paths {
relPath := file.StripRootDir(fullPath) relPath := file.StripRootDir(fullPath)
// Try to get the data from the cache // Try to get the data from the cache
item, found := SharedCache.Cache.Get(relPath) item, found := sharedcache.Cache.Get(relPath)
if !found { if !found {
item = HandleFileNotFound(relPath, fullPath, w) item = HandleFileNotFound(relPath, fullPath, w)
} }
@ -87,7 +87,7 @@ func ZipHandlerCompressMultiple(paths []string, w http.ResponseWriter, r *http.R
w.Header().Set("Content-Type", "application/zip") w.Header().Set("Content-Type", "application/zip")
//w.WriteHeader(http.StatusOK) //w.WriteHeader(http.StatusOK)
// If it's a directory, walk through it and add each file to the zip // If it's a directory, queuedwalk through it and add each file to the zip
filepath.Walk(fullPath, func(filePath string, info os.FileInfo, err error) error { filepath.Walk(fullPath, func(filePath string, info os.FileInfo, err error) error {
if info.IsDir() { if info.IsDir() {
return nil return nil

View File

@ -1,9 +1,9 @@
package routes package routes
import ( import (
"crazyfs/CacheItem"
"crazyfs/api/helpers" "crazyfs/api/helpers"
"crazyfs/cache" "crazyfs/cache"
"crazyfs/cacheitem"
"crazyfs/config" "crazyfs/config"
"crazyfs/elastic" "crazyfs/elastic"
"crazyfs/logging" "crazyfs/logging"
@ -65,8 +65,8 @@ func APISearch(w http.ResponseWriter, r *http.Request) {
searchStart := time.Now() searchStart := time.Now()
var results []*CacheItem.Item var results []*cacheitem.Item
results = make([]*CacheItem.Item, 0) results = make([]*cacheitem.Item, 0)
if config.GetConfig().ElasticsearchEnable { if config.GetConfig().ElasticsearchEnable {
// Perform the Elasticsearch query // Perform the Elasticsearch query
@ -102,7 +102,7 @@ func APISearch(w http.ResponseWriter, r *http.Request) {
if respData["hits"] != nil { if respData["hits"] != nil {
// Extract the results from the Elasticsearch response // Extract the results from the Elasticsearch response
hits := respData["hits"].(map[string]interface{})["hits"].([]interface{}) hits := respData["hits"].(map[string]interface{})["hits"].([]interface{})
items := make([]*CacheItem.Item, len(hits)) items := make([]*cacheitem.Item, len(hits))
for i, hit := range hits { for i, hit := range hits {
itemSource := hit.(map[string]interface{})["_source"].(map[string]interface{}) itemSource := hit.(map[string]interface{})["_source"].(map[string]interface{})
@ -119,7 +119,7 @@ func APISearch(w http.ResponseWriter, r *http.Request) {
} }
//score := hit.(map[string]interface{})["_score"].(float64) //score := hit.(map[string]interface{})["_score"].(float64)
item := &CacheItem.Item{ item := &cacheitem.Item{
Path: itemSource["path"].(string), Path: itemSource["path"].(string),
Name: itemSource["name"].(string), Name: itemSource["name"].(string),
Size: int64(itemSource["size"].(float64)), Size: int64(itemSource["size"].(float64)),

View File

@ -1,9 +1,9 @@
package admin package admin
import ( import (
"crazyfs/SharedCache"
"crazyfs/api/helpers" "crazyfs/api/helpers"
"crazyfs/config" "crazyfs/config"
"crazyfs/sharedcache"
"crypto/sha256" "crypto/sha256"
"crypto/subtle" "crypto/subtle"
"net/http" "net/http"
@ -23,7 +23,7 @@ func APIAdminCacheInfo(w http.ResponseWriter, r *http.Request) {
helpers.Return401Msg("unauthorized", w) helpers.Return401Msg("unauthorized", w)
return return
} else { } else {
cacheLen := SharedCache.Cache.Len() cacheLen := sharedcache.Cache.Len()
response := map[string]interface{}{ response := map[string]interface{}{
"cachedItems": cacheLen, "cachedItems": cacheLen,
"cacheMax": config.GetConfig().CacheSize, "cacheMax": config.GetConfig().CacheSize,

View File

@ -1,9 +1,9 @@
package admin package admin
import ( import (
"crazyfs/DirectoryCrawler"
"crazyfs/api/helpers" "crazyfs/api/helpers"
"crazyfs/config" "crazyfs/config"
"crazyfs/directorycrawler"
"crazyfs/elastic" "crazyfs/elastic"
"crazyfs/globals" "crazyfs/globals"
"crypto/sha256" "crypto/sha256"
@ -38,8 +38,8 @@ func APIAdminCrawlsInfo(w http.ResponseWriter, r *http.Request) {
response := map[string]interface{}{ response := map[string]interface{}{
"crawls": map[string]interface{}{ "crawls": map[string]interface{}{
"active": DirectoryCrawler.GetActiveCrawls(), "active": directorycrawler.GetActiveCrawls(),
"finished": DirectoryCrawler.GetFinishedCrawls(), "finished": directorycrawler.GetFinishedCrawls(),
}, },
"crawlWorkers": map[string]interface{}{ "crawlWorkers": map[string]interface{}{
"busy": atomic.LoadInt32(&globals.DirectoryCrawlers.BusyWorkers), "busy": atomic.LoadInt32(&globals.DirectoryCrawlers.BusyWorkers),

View File

@ -1,15 +1,15 @@
package client package client
import ( import (
"crazyfs/DirectoryCrawler"
"crazyfs/api/helpers" "crazyfs/api/helpers"
"crazyfs/cache" "crazyfs/cache"
"crazyfs/directorycrawler"
"net/http" "net/http"
) )
func APIHealthCheck(w http.ResponseWriter, r *http.Request) { func APIHealthCheck(w http.ResponseWriter, r *http.Request) {
response := map[string]interface{}{} response := map[string]interface{}{}
response["scanRunning"] = DirectoryCrawler.GetTotalActiveCrawls() > 0 response["scanRunning"] = directorycrawler.GetTotalActiveCrawls() > 0
response["initialScanRunning"] = cache.InitialCrawlInProgress response["initialScanRunning"] = cache.InitialCrawlInProgress
helpers.WriteJsonResponse(response, false, w, r) helpers.WriteJsonResponse(response, false, w, r)
} }

View File

@ -1,10 +1,10 @@
package file package file
import ( import (
"crazyfs/SharedCache"
"crazyfs/api/helpers" "crazyfs/api/helpers"
"crazyfs/config" "crazyfs/config"
"crazyfs/file" "crazyfs/file"
"crazyfs/sharedcache"
"fmt" "fmt"
"net/http" "net/http"
"os" "os"
@ -66,7 +66,7 @@ func APIDownload(w http.ResponseWriter, r *http.Request) {
} }
// Try to get the data from the cache // Try to get the data from the cache
item, found := SharedCache.Cache.Get(relPath) item, found := sharedcache.Cache.Get(relPath)
if !found { if !found {
item = helpers.HandleFileNotFound(relPath, fullPath, w) item = helpers.HandleFileNotFound(relPath, fullPath, w)
} }
@ -102,9 +102,9 @@ func APIDownload(w http.ResponseWriter, r *http.Request) {
log.Errorf("ROUTES:Download - Error detecting MIME type: %v", err) log.Errorf("ROUTES:Download - Error detecting MIME type: %v", err)
} else if mimeType != "" { } else if mimeType != "" {
// GetMimeType() returns an empty string if it was a directory. // GetMimeType() returns an empty string if it was a directory.
// Update the CacheItem's MIME in the sharedCache. // Update the cacheitem's MIME in the sharedCache.
item.MimeType = &mimeType item.MimeType = &mimeType
SharedCache.Cache.Add(relPath, item) sharedcache.Cache.Add(relPath, item)
} else { } else {
log.Errorf("ROUTES:Download - Failed to match a condition when checking a file's MIME - %s", fullPath) log.Errorf("ROUTES:Download - Failed to match a condition when checking a file's MIME - %s", fullPath)
helpers.Return500Msg(w) helpers.Return500Msg(w)
@ -146,7 +146,7 @@ func APIDownload(w http.ResponseWriter, r *http.Request) {
// Open the file // Open the file
openFile, err := os.Open(fullPath) openFile, err := os.Open(fullPath)
if err != nil { if err != nil {
SharedCache.Cache.Remove(relPath) // remove it from the cache sharedcache.Cache.Remove(relPath) // remove it from the cache
helpers.ReturnFake404Msg("file missing from disk, cache out of date", w) helpers.ReturnFake404Msg("file missing from disk, cache out of date", w)
return return
} }

View File

@ -1,11 +1,11 @@
package file package file
import ( import (
"crazyfs/ResponseItem"
"crazyfs/SharedCache"
"crazyfs/api/helpers" "crazyfs/api/helpers"
"crazyfs/config" "crazyfs/config"
"crazyfs/file" "crazyfs/file"
"crazyfs/responseitem"
"crazyfs/sharedcache"
"net/http" "net/http"
"strconv" "strconv"
) )
@ -45,7 +45,7 @@ func APIList(w http.ResponseWriter, r *http.Request) {
// Try to get the data from the cache // Try to get the data from the cache
relPath := file.StripRootDir(fullPath) relPath := file.StripRootDir(fullPath)
cacheItem, found := SharedCache.Cache.Get(relPath) cacheItem, found := sharedcache.Cache.Get(relPath)
if !found { if !found {
cacheItem = helpers.HandleFileNotFound(relPath, fullPath, w) cacheItem = helpers.HandleFileNotFound(relPath, fullPath, w)
} }
@ -73,16 +73,16 @@ func APIList(w http.ResponseWriter, r *http.Request) {
helpers.Return500Msg(w) helpers.Return500Msg(w)
return return
} }
// Update the original cached CacheItem's MIME in the sharedCache // Update the original cached cacheitem's MIME in the sharedCache
cacheItem.MimeType = &mimeType cacheItem.MimeType = &mimeType
cacheItem.Extension = &ext cacheItem.Extension = &ext
SharedCache.Cache.Add(relPath, cacheItem) // take the address of CacheItem sharedcache.Cache.Add(relPath, cacheItem) // take the address of cacheitem
} }
} }
} }
// Create a copy of the cached Item, so we don't modify the Item in the cache // Create a copy of the cached Item, so we don't modify the Item in the cache
item := ResponseItem.NewResponseItem(cacheItem) item := responseitem.NewResponseItem(cacheItem)
response := map[string]interface{}{} response := map[string]interface{}{}
@ -114,7 +114,7 @@ func APIList(w http.ResponseWriter, r *http.Request) {
} }
if folderSorting == "folders" { if folderSorting == "folders" {
var dirs, files []*ResponseItem.ResponseItem var dirs, files []*responseitem.ResponseItem
for _, child := range item.Children { for _, child := range item.Children {
if child.IsDir { if child.IsDir {
dirs = append(dirs, child) dirs = append(dirs, child)
@ -126,11 +126,11 @@ func APIList(w http.ResponseWriter, r *http.Request) {
} }
// Set the children to an empty array so that the JSON encoder doesn't return it as nil. // Set the children to an empty array so that the JSON encoder doesn't return it as nil.
var paginatedChildren []*ResponseItem.ResponseItem // this var will be either the full CacheItem list or a paginated list depending on the query args var paginatedChildren []*responseitem.ResponseItem // this var will be either the full cacheitem list or a paginated list depending on the query args
if item.Children != nil { if item.Children != nil {
paginatedChildren = item.Children paginatedChildren = item.Children
} else { } else {
paginatedChildren = make([]*ResponseItem.ResponseItem, 0) paginatedChildren = make([]*responseitem.ResponseItem, 0)
} }
pageParam := r.URL.Query().Get("page") pageParam := r.URL.Query().Get("page")

View File

@ -2,12 +2,12 @@ package file
import ( import (
"bytes" "bytes"
"crazyfs/SharedCache"
"crazyfs/api/helpers" "crazyfs/api/helpers"
"crazyfs/cache" "crazyfs/cache"
"crazyfs/config" "crazyfs/config"
"crazyfs/file" "crazyfs/file"
"crazyfs/logging" "crazyfs/logging"
"crazyfs/sharedcache"
"fmt" "fmt"
"github.com/disintegration/imaging" "github.com/disintegration/imaging"
"github.com/nfnt/resize" "github.com/nfnt/resize"
@ -62,7 +62,7 @@ func APIThumbnail(w http.ResponseWriter, r *http.Request) {
} }
// Try to get the data from the cache // Try to get the data from the cache
item, found := SharedCache.Cache.Get(relPath) item, found := sharedcache.Cache.Get(relPath)
if !found { if !found {
item = helpers.HandleFileNotFound(relPath, fullPath, w) item = helpers.HandleFileNotFound(relPath, fullPath, w)
} }
@ -87,10 +87,10 @@ func APIThumbnail(w http.ResponseWriter, r *http.Request) {
returnDummyPNG(w) returnDummyPNG(w)
return return
} }
// Update the CacheItem's MIME in the sharedCache // Update the cacheitem's MIME in the sharedCache
item.MimeType = &mimeType item.MimeType = &mimeType
item.Extension = &ext item.Extension = &ext
SharedCache.Cache.Add(relPath, item) sharedcache.Cache.Add(relPath, item)
// Check if the file is an image // Check if the file is an image
if !strings.HasPrefix(mimeType, "image/") { if !strings.HasPrefix(mimeType, "image/") {

16
src/cache/crawler.go vendored
View File

@ -1,10 +1,10 @@
package cache package cache
import ( import (
"crazyfs/DirectoryCrawler"
"crazyfs/SharedCache"
"crazyfs/config" "crazyfs/config"
"crazyfs/directorycrawler"
"crazyfs/globals" "crazyfs/globals"
"crazyfs/sharedcache"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -30,7 +30,7 @@ func startCrawl(wg *sync.WaitGroup) {
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
dc := DirectoryCrawler.NewDirectoryCrawler(globals.DirectoryCrawlers.Queue) dc := directorycrawler.NewDirectoryCrawler(globals.DirectoryCrawlers.Queue)
log.Infoln("CRAWLER - Recurring - Starting a crawl...") log.Infoln("CRAWLER - Recurring - Starting a crawl...")
start := time.Now() start := time.Now()
err := dc.Crawl(config.GetConfig().RootDir, nil) err := dc.Crawl(config.GetConfig().RootDir, nil)
@ -39,7 +39,7 @@ func startCrawl(wg *sync.WaitGroup) {
} else { } else {
duration := time.Since(start).Round(time.Second) duration := time.Since(start).Round(time.Second)
log.Infof("CRAWLER - Recurring - Crawl completed in %s", duration) log.Infof("CRAWLER - Recurring - Crawl completed in %s", duration)
log.Debugf("%d/%d items in the cache.", len(SharedCache.Cache.Keys()), config.GetConfig().CacheSize) log.Debugf("%d/%d items in the cache.", len(sharedcache.Cache.Keys()), config.GetConfig().CacheSize)
} }
}() }()
} }
@ -49,13 +49,13 @@ func logCacheStatus(msg string, ticker *time.Ticker, logFn func(format string, a
defer ticker.Stop() defer ticker.Stop()
for range ticker.C { for range ticker.C {
if !config.GetConfig().ElasticsearchSyncEnable || InitialCrawlInProgress { if !config.GetConfig().ElasticsearchSyncEnable || InitialCrawlInProgress {
logStr := "%s - %d/%d items in the cache. Busy Workers: %d. Jobs queued: %d. Running crawls: %d." logStr := "%s - %d/%d items in the cache. Busy workers: %d. Jobs queued: %d. Running crawls: %d."
logFn(logStr, logFn(logStr,
msg, len(SharedCache.Cache.Keys()), config.GetConfig().CacheSize, atomic.LoadInt32(&globals.DirectoryCrawlers.BusyWorkers), globals.DirectoryCrawlers.Queue.GetQueueSize(), DirectoryCrawler.GetTotalActiveCrawls()) msg, len(sharedcache.Cache.Keys()), config.GetConfig().CacheSize, atomic.LoadInt32(&globals.DirectoryCrawlers.BusyWorkers), globals.DirectoryCrawlers.Queue.GetQueueSize(), directorycrawler.GetTotalActiveCrawls())
} else { } else {
logStr := "%s - %d/%d items in the cache. Busy Workers: %d. Jobs queued: %d. Running crawls: %d. Busy Elastic delete workers: %d. Elastic deletes queued: %d" logStr := "%s - %d/%d items in the cache. Busy workers: %d. Jobs queued: %d. Running crawls: %d. Busy Elastic delete workers: %d. Elastic deletes queued: %d"
logFn(logStr, logFn(logStr,
msg, len(SharedCache.Cache.Keys()), config.GetConfig().CacheSize, atomic.LoadInt32(&globals.DirectoryCrawlers.BusyWorkers), globals.DirectoryCrawlers.Queue.GetQueueSize(), DirectoryCrawler.GetTotalActiveCrawls(), globals.ElasticCrawlers.BusyWorkers, globals.ElasticCrawlers.Queue.GetQueueSize()) msg, len(sharedcache.Cache.Keys()), config.GetConfig().CacheSize, atomic.LoadInt32(&globals.DirectoryCrawlers.BusyWorkers), globals.DirectoryCrawlers.Queue.GetQueueSize(), directorycrawler.GetTotalActiveCrawls(), globals.ElasticCrawlers.BusyWorkers, globals.ElasticCrawlers.Queue.GetQueueSize())
} }
} }
} }

View File

@ -1,8 +1,8 @@
package cache package cache
import ( import (
"crazyfs/DirectoryCrawler"
"crazyfs/config" "crazyfs/config"
"crazyfs/directorycrawler"
"crazyfs/globals" "crazyfs/globals"
"crazyfs/logging" "crazyfs/logging"
"sync" "sync"
@ -28,7 +28,7 @@ func InitialCrawl() {
InitialCrawlLock.Lock() InitialCrawlLock.Lock()
InitialCrawlInProgress = true InitialCrawlInProgress = true
dc := DirectoryCrawler.NewDirectoryCrawler(globals.DirectoryCrawlers.Queue) dc := directorycrawler.NewDirectoryCrawler(globals.DirectoryCrawlers.Queue)
//start := time.Now() //start := time.Now()
err := dc.Crawl(config.GetConfig().RootDir, nil) err := dc.Crawl(config.GetConfig().RootDir, nil)
if err != nil { if err != nil {

20
src/cache/recache.go vendored
View File

@ -1,12 +1,12 @@
package cache package cache
import ( import (
"crazyfs/CacheItem" "crazyfs/cacheitem"
"crazyfs/DirectoryCrawler"
"crazyfs/SharedCache"
"crazyfs/config" "crazyfs/config"
"crazyfs/directorycrawler"
"crazyfs/file" "crazyfs/file"
"crazyfs/globals" "crazyfs/globals"
"crazyfs/sharedcache"
"errors" "errors"
"os" "os"
"path/filepath" "path/filepath"
@ -20,13 +20,13 @@ func InitRecacheSemaphore(limit int) {
} }
func CheckAndRecache(path string) { func CheckAndRecache(path string) {
item, found := SharedCache.Cache.Get(path) item, found := sharedcache.Cache.Get(path)
if found && time.Now().UnixNano()/int64(time.Millisecond)-item.Cached > int64(config.GetConfig().CacheTime)*60*1000 { if found && time.Now().UnixNano()/int64(time.Millisecond)-item.Cached > int64(config.GetConfig().CacheTime)*60*1000 {
log.Debugf("CACHE:Recache - re-caching: %s", path) log.Debugf("CACHE:Recache - re-caching: %s", path)
sem <- struct{}{} // acquire a token sem <- struct{}{} // acquire a token
go func() { go func() {
defer func() { <-sem }() // release the token when done defer func() { <-sem }() // release the token when done
dc := DirectoryCrawler.NewDirectoryCrawler(globals.DirectoryCrawlers.Queue) dc := directorycrawler.NewDirectoryCrawler(globals.DirectoryCrawlers.Queue)
err := dc.Crawl(path, nil) err := dc.Crawl(path, nil)
if err != nil { if err != nil {
log.Errorf("CACHE:Recache - %s", err.Error()) log.Errorf("CACHE:Recache - %s", err.Error())
@ -37,7 +37,7 @@ func CheckAndRecache(path string) {
} }
func Recache(path string) error { func Recache(path string) error {
dc := DirectoryCrawler.NewDirectoryCrawler(globals.DirectoryCrawlers.Queue) dc := directorycrawler.NewDirectoryCrawler(globals.DirectoryCrawlers.Queue)
if dc.IsCrawlActive(path, nil) { if dc.IsCrawlActive(path, nil) {
return errors.New("rejecting crawl, already in progress for this path") return errors.New("rejecting crawl, already in progress for this path")
} }
@ -47,7 +47,7 @@ func Recache(path string) error {
sem <- struct{}{} // acquire a token sem <- struct{}{} // acquire a token
go func() { go func() {
defer func() { <-sem }() // release the token when done defer func() { <-sem }() // release the token when done
dc := DirectoryCrawler.NewDirectoryCrawler(globals.DirectoryCrawlers.Queue) dc := directorycrawler.NewDirectoryCrawler(globals.DirectoryCrawlers.Queue)
err := dc.Crawl(path, nil) err := dc.Crawl(path, nil)
if err != nil { if err != nil {
log.Errorf("CACHE:Recache - %s", err.Error()) log.Errorf("CACHE:Recache - %s", err.Error())
@ -57,7 +57,7 @@ func Recache(path string) error {
// Get the parent directory from the cache // Get the parent directory from the cache
parentDir := filepath.Dir(path) parentDir := filepath.Dir(path)
parentDirRel := file.StripRootDir(parentDir) parentDirRel := file.StripRootDir(parentDir)
parentItem, found := SharedCache.Cache.Get(parentDirRel) parentItem, found := sharedcache.Cache.Get(parentDirRel)
if found { if found {
// Remove the old subdirectory from the parent directory's Children field // Remove the old subdirectory from the parent directory's Children field
for i, child := range parentItem.Children { for i, child := range parentItem.Children {
@ -73,7 +73,7 @@ func Recache(path string) error {
log.Errorf("CACHE:Recache - %s", err.Error()) log.Errorf("CACHE:Recache - %s", err.Error())
return return
} else { } else {
newItem := CacheItem.NewItem(path, info) newItem := cacheitem.NewItem(path, info)
// Create a new slice that contains all items from the Children field except the old directory // Create a new slice that contains all items from the Children field except the old directory
newChildren := make([]string, 0, len(parentItem.Children)) newChildren := make([]string, 0, len(parentItem.Children))
for _, child := range parentItem.Children { for _, child := range parentItem.Children {
@ -86,7 +86,7 @@ func Recache(path string) error {
// Assign the newChildren slice to the Children field // Assign the newChildren slice to the Children field
parentItem.Children = newChildren parentItem.Children = newChildren
// Update the parent directory in the cache // Update the parent directory in the cache
SharedCache.Cache.Add(parentDir, parentItem) sharedcache.Cache.Add(parentDir, parentItem)
} }
} else if !file.PathOutsideRoot(parentDir) { } else if !file.PathOutsideRoot(parentDir) {
// If the parent directory isn't in the cache, crawl it. // If the parent directory isn't in the cache, crawl it.

26
src/cache/search.go vendored
View File

@ -2,24 +2,24 @@ package cache
import ( import (
"bytes" "bytes"
"crazyfs/CacheItem" "crazyfs/cacheitem"
"crazyfs/SharedCache"
"crazyfs/config" "crazyfs/config"
"crazyfs/sharedcache"
"encoding/gob" "encoding/gob"
"strings" "strings"
) )
func SearchLRU(queryString string, excludeElements []string, limitResults int) []*CacheItem.Item { func SearchLRU(queryString string, excludeElements []string, limitResults int) []*cacheitem.Item {
results := make([]*CacheItem.Item, 0) results := make([]*cacheitem.Item, 0)
const maxGoroutines = 100 const maxGoroutines = 100
// Create a buffered channel as a semaphore // Create a buffered channel as a semaphore
sem := make(chan struct{}, maxGoroutines) sem := make(chan struct{}, maxGoroutines)
resultsChan := make(chan *CacheItem.Item, len(SharedCache.Cache.Keys())) resultsChan := make(chan *cacheitem.Item, len(sharedcache.Cache.Keys()))
for _, key := range SharedCache.Cache.Keys() { for _, key := range sharedcache.Cache.Keys() {
searchKey(key, queryString, excludeElements, sem, resultsChan) searchKey(key, queryString, excludeElements, sem, resultsChan)
} }
@ -28,7 +28,7 @@ func SearchLRU(queryString string, excludeElements []string, limitResults int) [
sem <- struct{}{} sem <- struct{}{}
} }
for range SharedCache.Cache.Keys() { for range sharedcache.Cache.Keys() {
item := <-resultsChan item := <-resultsChan
if item != nil { if item != nil {
results = append(results, item) results = append(results, item)
@ -41,7 +41,7 @@ func SearchLRU(queryString string, excludeElements []string, limitResults int) [
return results return results
} }
func searchKey(key string, queryString string, excludeElements []string, sem chan struct{}, resultsChan chan *CacheItem.Item) { func searchKey(key string, queryString string, excludeElements []string, sem chan struct{}, resultsChan chan *cacheitem.Item) {
// Acquire a token // Acquire a token
sem <- struct{}{} sem <- struct{}{}
@ -49,7 +49,7 @@ func searchKey(key string, queryString string, excludeElements []string, sem cha
// Release the token at the end // Release the token at the end
defer func() { <-sem }() defer func() { <-sem }()
cacheItem, found := SharedCache.Cache.Get(key) cacheItem, found := sharedcache.Cache.Get(key)
if !found { if !found {
resultsChan <- nil resultsChan <- nil
return return
@ -70,20 +70,20 @@ func searchKey(key string, queryString string, excludeElements []string, sem cha
return return
} }
// Create a deep copy of the CacheItem // Create a deep copy of the cacheitem
var buf bytes.Buffer var buf bytes.Buffer
enc := gob.NewEncoder(&buf) enc := gob.NewEncoder(&buf)
dec := gob.NewDecoder(&buf) dec := gob.NewDecoder(&buf)
err := enc.Encode(cacheItem) err := enc.Encode(cacheItem)
if err != nil { if err != nil {
log.Printf("CACHE:searchKey - Error encoding CacheItem: %v", err) log.Printf("CACHE:searchKey - Error encoding cacheitem: %v", err)
resultsChan <- nil resultsChan <- nil
return return
} }
var item CacheItem.Item var item cacheitem.Item
err = dec.Decode(&item) err = dec.Decode(&item)
if err != nil { if err != nil {
log.Printf("CACHE:searchKey - Error decoding CacheItem: %v", err) log.Printf("CACHE:searchKey - Error decoding cacheitem: %v", err)
resultsChan <- nil resultsChan <- nil
return return
} }

View File

@ -1,4 +1,4 @@
package CacheItem package cacheitem
type Item struct { type Item struct {
Path string `json:"path"` Path string `json:"path"`

View File

@ -1,4 +1,4 @@
package CacheItem package cacheitem
import ( import (
"crazyfs/config" "crazyfs/config"

View File

@ -1,4 +1,4 @@
package CacheItem package cacheitem
import ( import (
"crazyfs/logging" "crazyfs/logging"

View File

@ -1,9 +1,9 @@
package Crawlers package crawlers
import ( import (
"crazyfs/Workers"
"crazyfs/config" "crazyfs/config"
"crazyfs/globals" "crazyfs/globals"
"crazyfs/workers"
"sync/atomic" "sync/atomic"
) )
@ -11,7 +11,7 @@ func InitializeDirectoryCrawlerWorkers() *globals.DcWorkers {
if globals.DirectoryCrawlers != nil { if globals.DirectoryCrawlers != nil {
panic("DirectoryCrawlers has already been defined!") panic("DirectoryCrawlers has already been defined!")
} }
workers := Workers.InitializeWorkers(directoryCrawlerWorker) workers := workers.InitializeWorkers(directoryCrawlerWorker)
d := &globals.DcWorkers{} d := &globals.DcWorkers{}
d.Queue = workers.Queue d.Queue = workers.Queue
d.BusyWorkers = workers.BusyWorkers d.BusyWorkers = workers.BusyWorkers
@ -20,7 +20,7 @@ func InitializeDirectoryCrawlerWorkers() *globals.DcWorkers {
return d return d
} }
func directoryCrawlerWorker(w *Workers.CrawlWorkers) { func directoryCrawlerWorker(w *workers.CrawlWorkers) {
for { for {
job := w.Queue.GetJob() job := w.Queue.GetJob()
atomic.AddInt32(&w.BusyWorkers, 1) atomic.AddInt32(&w.BusyWorkers, 1)

View File

@ -1,11 +1,11 @@
package Crawlers package crawlers
import ( import (
"crazyfs/SharedCache"
"crazyfs/Workers"
"crazyfs/config" "crazyfs/config"
"crazyfs/elastic" "crazyfs/elastic"
"crazyfs/globals" "crazyfs/globals"
"crazyfs/sharedcache"
"crazyfs/workers"
"sync/atomic" "sync/atomic"
) )
@ -13,7 +13,7 @@ func InitializeElasticCrawlerWorkers() *globals.DcWorkers {
if globals.ElasticCrawlers != nil { if globals.ElasticCrawlers != nil {
panic("ElasticCrawlers has already been defined!") panic("ElasticCrawlers has already been defined!")
} }
workers := Workers.InitializeWorkers(elasticDeleteWorker) workers := workers.InitializeWorkers(elasticDeleteWorker)
d := &globals.DcWorkers{} d := &globals.DcWorkers{}
d.Queue = workers.Queue d.Queue = workers.Queue
d.BusyWorkers = workers.BusyWorkers d.BusyWorkers = workers.BusyWorkers
@ -22,12 +22,12 @@ func InitializeElasticCrawlerWorkers() *globals.DcWorkers {
return d return d
} }
func elasticDeleteWorker(w *Workers.CrawlWorkers) { func elasticDeleteWorker(w *workers.CrawlWorkers) {
for { for {
job := w.Queue.GetJob() job := w.Queue.GetJob()
atomic.AddInt32(&w.BusyWorkers, 1) atomic.AddInt32(&w.BusyWorkers, 1)
if _, ok := SharedCache.Cache.Get(job.StartPath); !ok { if _, ok := sharedcache.Cache.Get(job.StartPath); !ok {
// If a key in Elastic does not exist in the LRU cache, delete it from Elastic. // If a key in Elastic does not exist in the LRU cache, delete it from Elastic.
e := *job.Extra e := *job.Extra
key := e["key"].(string) key := e["key"].(string)

View File

@ -1,4 +1,4 @@
package Crawlers package crawlers
import ( import (
"crazyfs/logging" "crazyfs/logging"

View File

@ -1,13 +1,13 @@
package main package main
import ( import (
"crazyfs/Crawlers"
"crazyfs/SharedCache"
"crazyfs/api" "crazyfs/api"
"crazyfs/cache" "crazyfs/cache"
"crazyfs/config" "crazyfs/config"
"crazyfs/crawlers"
"crazyfs/elastic" "crazyfs/elastic"
"crazyfs/logging" "crazyfs/logging"
"crazyfs/sharedcache"
"errors" "errors"
"flag" "flag"
"fmt" "fmt"
@ -105,7 +105,7 @@ func main() {
log.Fatalf("Failed to load config file: %s", err) log.Fatalf("Failed to load config file: %s", err)
} }
err = SharedCache.NewCache(cfg.CacheSize) err = sharedcache.NewCache(cfg.CacheSize)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
@ -129,7 +129,7 @@ func main() {
elastic.Enabled = false elastic.Enabled = false
} else { } else {
elastic.ElasticClient = es elastic.ElasticClient = es
Crawlers.InitializeElasticCrawlerWorkers() crawlers.InitializeElasticCrawlerWorkers()
// This could take a minute, so we do this in the background while we crawl. // This could take a minute, so we do this in the background while we crawl.
go func() { go func() {
elastic.EnableElasticsearchConnection() elastic.EnableElasticsearchConnection()
@ -151,7 +151,7 @@ func main() {
log.Infof("Elasticsearch enabled: %t", cfg.ElasticsearchEnable && !cliArgs.disableElasticSync) log.Infof("Elasticsearch enabled: %t", cfg.ElasticsearchEnable && !cliArgs.disableElasticSync)
Crawlers.InitializeDirectoryCrawlerWorkers() crawlers.InitializeDirectoryCrawlerWorkers()
cache.InitRecacheSemaphore(cfg.CacheRecacheCrawlerLimit) cache.InitRecacheSemaphore(cfg.CacheRecacheCrawlerLimit)
@ -170,7 +170,7 @@ func main() {
start := time.Now() start := time.Now()
cache.InitialCrawl() cache.InitialCrawl()
duration := time.Since(start).Round(time.Second) duration := time.Since(start).Round(time.Second)
keys := SharedCache.Cache.Keys() keys := sharedcache.Cache.Keys()
config.InitialCrawlElapsed = int(duration.Seconds()) config.InitialCrawlElapsed = int(duration.Seconds())
log.Infof("Initial crawl completed in %s. %d items added to the cache.", duration, len(keys)) log.Infof("Initial crawl completed in %s. %d items added to the cache.", duration, len(keys))
} }

View File

@ -1,10 +1,10 @@
package DirectoryCrawler package directorycrawler
import ( import (
"crazyfs/CacheItem" "crazyfs/cacheitem"
"crazyfs/SharedCache"
"crazyfs/Walk"
"crazyfs/file" "crazyfs/file"
"crazyfs/queuedwalk"
"crazyfs/sharedcache"
"os" "os"
"path/filepath" "path/filepath"
"strings" "strings"
@ -36,10 +36,10 @@ type FinishedCrawl struct {
type DirectoryCrawler struct { type DirectoryCrawler struct {
visited sync.Map visited sync.Map
wg sync.WaitGroup wg sync.WaitGroup
queue *Walk.JobQueue queue *queuedwalk.JobQueue
} }
func NewDirectoryCrawler(queue *Walk.JobQueue) *DirectoryCrawler { func NewDirectoryCrawler(queue *queuedwalk.JobQueue) *DirectoryCrawler {
return &DirectoryCrawler{ return &DirectoryCrawler{
visited: sync.Map{}, visited: sync.Map{},
queue: queue, queue: queue,
@ -50,7 +50,7 @@ func (dc *DirectoryCrawler) CleanupDeletedFiles(path string) {
dc.visited.Range(func(key, value interface{}) bool { dc.visited.Range(func(key, value interface{}) bool {
keyStr := key.(string) keyStr := key.(string)
if isSubpath(file.StripRootDir(path), keyStr) && value.(bool) { if isSubpath(file.StripRootDir(path), keyStr) && value.(bool) {
SharedCache.Cache.Remove(keyStr) sharedcache.Cache.Remove(keyStr)
} }
return true return true
}) })
@ -58,10 +58,10 @@ func (dc *DirectoryCrawler) CleanupDeletedFiles(path string) {
func (dc *DirectoryCrawler) AddCacheItem(fullPath string, info os.FileInfo) { func (dc *DirectoryCrawler) AddCacheItem(fullPath string, info os.FileInfo) {
strippedPath := file.StripRootDir(fullPath) strippedPath := file.StripRootDir(fullPath)
item := CacheItem.NewItem(fullPath, info) item := cacheitem.NewItem(fullPath, info)
if item != nil { if item != nil {
// Sometimes CacheItem.NewItem will return nil if the path fails its checks // Sometimes cacheitem.NewItem will return nil if the path fails its checks
SharedCache.Cache.Add(strippedPath, item) sharedcache.Cache.Add(strippedPath, item)
} else { } else {
//log.Errorf("NewItem returned nil for %s", fullPath) //log.Errorf("NewItem returned nil for %s", fullPath)
} }

View File

@ -1,11 +1,11 @@
package DirectoryCrawler package directorycrawler
import ( import (
"crazyfs/CacheItem" "crazyfs/cacheitem"
"crazyfs/SharedCache"
"crazyfs/Walk"
"crazyfs/config" "crazyfs/config"
"crazyfs/file" "crazyfs/file"
"crazyfs/queuedwalk"
"crazyfs/sharedcache"
"errors" "errors"
"fmt" "fmt"
"os" "os"
@ -76,12 +76,12 @@ func (dc *DirectoryCrawler) Crawl(fullPath string, walkFunc func(string, os.File
//relPath := file.StripRootDir(fullPath) //relPath := file.StripRootDir(fullPath)
//SharedCache.Cache.Remove(relPath) //sharedcache.Cache.Remove(relPath)
if info.IsDir() { if info.IsDir() {
// Get a list of all keys in the cache that belong to this directory // Get a list of all keys in the cache that belong to this directory
keys := make([]string, 0) keys := make([]string, 0)
for _, key := range SharedCache.Cache.Keys() { for _, key := range sharedcache.Cache.Keys() {
if isSubpath(fullPath, key) { if isSubpath(fullPath, key) {
keys = append(keys, key) keys = append(keys, key)
} }
@ -89,11 +89,11 @@ func (dc *DirectoryCrawler) Crawl(fullPath string, walkFunc func(string, os.File
// Remove all entries in the cache that belong to this directory, so we can start fresh. // Remove all entries in the cache that belong to this directory, so we can start fresh.
for _, key := range keys { for _, key := range keys {
SharedCache.Cache.Remove(key) sharedcache.Cache.Remove(key)
} }
// If the path is a directory, start a walk // If the path is a directory, start a queuedwalk
err := Walk.Walk(fullPath, config.FollowSymlinks, walkFunc, dc.queue) err := queuedwalk.Walk(fullPath, config.FollowSymlinks, walkFunc, dc.queue)
if err != nil { if err != nil {
log.Errorf(`CRAWLER:Crawl - Crawl for "%s" failed: %s`, fullPath, err) log.Errorf(`CRAWLER:Crawl - Crawl for "%s" failed: %s`, fullPath, err)
} }
@ -105,7 +105,7 @@ func (dc *DirectoryCrawler) Crawl(fullPath string, walkFunc func(string, os.File
} }
// CrawlNoRecursion this function crawls a file or directory and does not recurse into any subdirectories. Also returns the result of the crawl. // CrawlNoRecursion this function crawls a file or directory and does not recurse into any subdirectories. Also returns the result of the crawl.
func (dc *DirectoryCrawler) CrawlNoRecursion(fullPath string) (*CacheItem.Item, error) { func (dc *DirectoryCrawler) CrawlNoRecursion(fullPath string) (*cacheitem.Item, error) {
file.RetardCheck(fullPath) file.RetardCheck(fullPath)
// TODO: check if symlink and reject if it is // TODO: check if symlink and reject if it is
@ -132,15 +132,15 @@ func (dc *DirectoryCrawler) CrawlNoRecursion(fullPath string) (*CacheItem.Item,
// return nil, errors.New(msg) // return nil, errors.New(msg)
//} //}
var item *CacheItem.Item var item *cacheitem.Item
relPath := file.StripRootDir(fullPath) relPath := file.StripRootDir(fullPath)
SharedCache.Cache.Remove(relPath) sharedcache.Cache.Remove(relPath)
if info.IsDir() { if info.IsDir() {
// Get a list of all keys in the cache that belong to this directory. // Get a list of all keys in the cache that belong to this directory.
keys := make([]string, 0) keys := make([]string, 0)
for _, key := range SharedCache.Cache.Keys() { for _, key := range sharedcache.Cache.Keys() {
if isSubpath(fullPath, key) { if isSubpath(fullPath, key) {
keys = append(keys, key) keys = append(keys, key)
} }
@ -148,7 +148,7 @@ func (dc *DirectoryCrawler) CrawlNoRecursion(fullPath string) (*CacheItem.Item,
// Remove all entries in the cache that belong to this directory, so we can start fresh. // Remove all entries in the cache that belong to this directory, so we can start fresh.
for _, key := range keys { for _, key := range keys {
SharedCache.Cache.Remove(key) sharedcache.Cache.Remove(key)
} }
err := filepath.WalkDir(fullPath, dc.walkNonRecursiveFunc) err := filepath.WalkDir(fullPath, dc.walkNonRecursiveFunc)
@ -156,9 +156,9 @@ func (dc *DirectoryCrawler) CrawlNoRecursion(fullPath string) (*CacheItem.Item,
log.Errorf(`CRAWLER:CrawlNoRecursion - Crawl for "%s" failed: %s`, fullPath, err) log.Errorf(`CRAWLER:CrawlNoRecursion - Crawl for "%s" failed: %s`, fullPath, err)
return nil, err return nil, err
} }
item, _ = SharedCache.Cache.Get(relPath) item, _ = sharedcache.Cache.Get(relPath)
} else { } else {
item = CacheItem.NewItem(fullPath, info) item = cacheitem.NewItem(fullPath, info)
dc.AddCacheItem(fullPath, info) dc.AddCacheItem(fullPath, info)
} }
return item, nil return item, nil

View File

@ -1,4 +1,4 @@
package DirectoryCrawler package directorycrawler
import ( import (
"crazyfs/logging" "crazyfs/logging"

View File

@ -1,10 +1,10 @@
package DirectoryCrawler package directorycrawler
import ( import (
"crazyfs/CacheItem" "crazyfs/cacheitem"
"crazyfs/SharedCache"
"crazyfs/config" "crazyfs/config"
"crazyfs/file" "crazyfs/file"
"crazyfs/sharedcache"
"os" "os"
"path/filepath" "path/filepath"
) )
@ -15,7 +15,7 @@ func (dc *DirectoryCrawler) processPath(fullPath string, info os.FileInfo) error
dc.visited.Store(relPath, true) dc.visited.Store(relPath, true)
if info.Mode().IsDir() { if info.Mode().IsDir() {
dirItem := CacheItem.NewItem(fullPath, info) dirItem := cacheitem.NewItem(fullPath, info)
children, err := os.ReadDir(fullPath) children, err := os.ReadDir(fullPath)
if err != nil { if err != nil {
@ -29,7 +29,7 @@ func (dc *DirectoryCrawler) processPath(fullPath string, info os.FileInfo) error
} }
// Add the directory to the cache after all of its children have been processed // Add the directory to the cache after all of its children have been processed
SharedCache.Cache.Add(relPath, dirItem) sharedcache.Cache.Add(relPath, dirItem)
// If the directory is not the root directory, update the parent directory's Children field // If the directory is not the root directory, update the parent directory's Children field
// This block of code ensures that the parent directory's Children field is always up-to-date with // This block of code ensures that the parent directory's Children field is always up-to-date with
@ -38,7 +38,7 @@ func (dc *DirectoryCrawler) processPath(fullPath string, info os.FileInfo) error
if fullPath != config.GetConfig().RootDir { if fullPath != config.GetConfig().RootDir {
parentDir := filepath.Dir(fullPath) parentDir := filepath.Dir(fullPath)
strippedParentDir := file.StripRootDir(parentDir) strippedParentDir := file.StripRootDir(parentDir)
parentItem, found := SharedCache.Cache.Get(strippedParentDir) parentItem, found := sharedcache.Cache.Get(strippedParentDir)
if found { if found {
// Remove the old version of the directory from the parent's Children field // Remove the old version of the directory from the parent's Children field
newChildren, _ := removeOldDir(parentItem.Children, relPath) newChildren, _ := removeOldDir(parentItem.Children, relPath)
@ -47,7 +47,7 @@ func (dc *DirectoryCrawler) processPath(fullPath string, info os.FileInfo) error
parentItem.Children = append(newChildren, relPath) parentItem.Children = append(newChildren, relPath)
// Update the parent directory in the cache // Update the parent directory in the cache
SharedCache.Cache.Add(strippedParentDir, parentItem) sharedcache.Cache.Add(strippedParentDir, parentItem)
} }
} }
} else { } else {

View File

@ -1,8 +1,8 @@
package elastic package elastic
import ( import (
"crazyfs/DirectoryCrawler"
"crazyfs/config" "crazyfs/config"
"crazyfs/directorycrawler"
"crazyfs/globals" "crazyfs/globals"
"sync" "sync"
"time" "time"
@ -80,7 +80,7 @@ func syncElasticsearch(doFullSync bool) {
startRemoveStaleItemsFromElasticsearch(globalPathsByKey) startRemoveStaleItemsFromElasticsearch(globalPathsByKey)
dc := DirectoryCrawler.NewDirectoryCrawler(globals.DirectoryCrawlers.Queue) // TODO: replace with proper elastic queue dc := directorycrawler.NewDirectoryCrawler(globals.DirectoryCrawlers.Queue) // TODO: replace with proper elastic queue
err = dc.Crawl(config.GetConfig().RootDir, addToElasticsearch) err = dc.Crawl(config.GetConfig().RootDir, addToElasticsearch)
if err != nil { if err != nil {
log.Errorf("ELASTIC - Crawl failed: %s", err) log.Errorf("ELASTIC - Crawl failed: %s", err)

View File

@ -3,10 +3,10 @@ package elastic
import ( import (
"bytes" "bytes"
"context" "context"
"crazyfs/CacheItem" "crazyfs/cacheitem"
"crazyfs/SharedCache"
"crazyfs/config" "crazyfs/config"
"crazyfs/file" "crazyfs/file"
"crazyfs/sharedcache"
"encoding/json" "encoding/json"
"github.com/elastic/go-elasticsearch/v8/esapi" "github.com/elastic/go-elasticsearch/v8/esapi"
"os" "os"
@ -24,7 +24,7 @@ var fullSync bool
func addToElasticsearch(fullPath string, info os.FileInfo, incomingErr error) error { func addToElasticsearch(fullPath string, info os.FileInfo, incomingErr error) error {
relPath := file.StripRootDir(fullPath) relPath := file.StripRootDir(fullPath)
if !shouldExclude(relPath, config.GetConfig().ElasticsearchExcludePatterns) { if !shouldExclude(relPath, config.GetConfig().ElasticsearchExcludePatterns) {
cacheItem, found := SharedCache.Cache.Get(relPath) cacheItem, found := sharedcache.Cache.Get(relPath)
if !found { if !found {
// This sort of thing can happen if new files have been added on disk but a scan has not been run to refresh the cache. // This sort of thing can happen if new files have been added on disk but a scan has not been run to refresh the cache.
log.Debugf(`ELASTIC:Add - Path "%s" exists on disk, but not in the LRU cache. Deleting from Elastic.`, relPath) log.Debugf(`ELASTIC:Add - Path "%s" exists on disk, but not in the LRU cache. Deleting from Elastic.`, relPath)
@ -47,7 +47,7 @@ func addToElasticsearch(fullPath string, info os.FileInfo, incomingErr error) er
return nil return nil
} }
func preformAddToElasticsearch(item *CacheItem.Item) { func preformAddToElasticsearch(item *cacheitem.Item) {
preparedItem, err := prepareCacheItem(item) preparedItem, err := prepareCacheItem(item)
if err != nil { if err != nil {
log.Printf("ELASTIC:Add - Error preparing new item: %s", err) log.Printf("ELASTIC:Add - Error preparing new item: %s", err)
@ -85,8 +85,8 @@ func preformAddToElasticsearch(item *CacheItem.Item) {
} }
// prepareCacheItem is used to get an item ready to insert into Elastic. // prepareCacheItem is used to get an item ready to insert into Elastic.
func prepareCacheItem(item *CacheItem.Item) (*CacheItem.Item, error) { func prepareCacheItem(item *cacheitem.Item) (*cacheitem.Item, error) {
resolvedItem := CacheItem.Item{ resolvedItem := cacheitem.Item{
Path: item.Path, Path: item.Path,
Name: item.Name, Name: item.Name,
Size: item.Size, Size: item.Size,

View File

@ -2,9 +2,9 @@ package elastic
import ( import (
"context" "context"
"crazyfs/Walk"
"crazyfs/config" "crazyfs/config"
"crazyfs/globals" "crazyfs/globals"
"crazyfs/queuedwalk"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
@ -23,7 +23,7 @@ func startRemoveStaleItemsFromElasticsearch(pathsByKey map[string]string) {
// For each key in Elasticsearch, create a job to check (and remove it if the key no longer exists in the cache). // For each key in Elasticsearch, create a job to check (and remove it if the key no longer exists in the cache).
for path, key := range pathsByKey { for path, key := range pathsByKey {
job := Walk.Job{ job := queuedwalk.Job{
StartPath: path, StartPath: path,
} }
extra := make(map[string]interface{}) extra := make(map[string]interface{})

View File

@ -1,7 +1,7 @@
package globals package globals
import ( import (
"crazyfs/Walk" "crazyfs/queuedwalk"
) )
var ElasticCrawlers *DcWorkers var ElasticCrawlers *DcWorkers
@ -9,6 +9,6 @@ var ElasticCrawlers *DcWorkers
var DirectoryCrawlers *DcWorkers var DirectoryCrawlers *DcWorkers
type DcWorkers struct { type DcWorkers struct {
Queue *Walk.JobQueue Queue *queuedwalk.JobQueue
BusyWorkers int32 BusyWorkers int32
} }

View File

@ -1,4 +1,4 @@
package Walk package queuedwalk
import ( import (
"crazyfs/file" "crazyfs/file"

View File

@ -1,4 +1,4 @@
package Walk package queuedwalk
import ( import (
"github.com/enriquebris/goconcurrentqueue" "github.com/enriquebris/goconcurrentqueue"
@ -7,7 +7,7 @@ import (
// This is a queue implementation that doesn't rely on channels. This way of doing things should be more memory-efficient. // This is a queue implementation that doesn't rely on channels. This way of doing things should be more memory-efficient.
// Job is an individual job passed to the Workers. // Job is an individual job passed to the workers.
type Job struct { type Job struct {
StartPath string StartPath string
Walker *Walker // A pointer to the shared Walker object is passed as well. Walker *Walker // A pointer to the shared Walker object is passed as well.

View File

@ -1,4 +1,4 @@
package Walk package queuedwalk
import ( import (
"errors" "errors"
@ -7,7 +7,7 @@ import (
"sync" "sync"
) )
// Walker.go is the implementation behind `Walk()`, which is a filesystem walk // Walker.go is the implementation behind `queuedwalk()`, which is a filesystem queuedwalk
// using workers that pull jobs from a queue. // using workers that pull jobs from a queue.
// ErrNotDir indicates that the path, which is being passed to a walker function, does not point to a directory. // ErrNotDir indicates that the path, which is being passed to a walker function, does not point to a directory.
@ -70,7 +70,7 @@ func (w *Walker) ProcessPath(relPath string) error {
} }
// Walk recursively descends into subdirectories, calling the user-defined walkFn for each file or directory // Walk recursively descends into subdirectories, calling the user-defined walkFn for each file or directory
// in the tree, starting with the root directory. It is only called one place: `Walk()` in Walk.go // in the tree, starting with the root directory. It is only called one place: `queuedwalk()` in queuedwalk.go
func (w *Walker) Walk(relPath string, walkFn filepath.WalkFunc) error { func (w *Walker) Walk(relPath string, walkFn filepath.WalkFunc) error {
// TODO: compare with filepath.WalkDir() // TODO: compare with filepath.WalkDir()

View File

@ -1,4 +1,4 @@
package Walk package queuedwalk
import ( import (
"os" "os"

View File

@ -1,4 +1,4 @@
package Walk package queuedwalk
import ( import (
"crazyfs/logging" "crazyfs/logging"

View File

@ -1,12 +1,12 @@
package ResponseItem package responseitem
import ( import (
"crazyfs/CacheItem" "crazyfs/cacheitem"
"crazyfs/DirectoryCrawler"
"crazyfs/SharedCache"
"crazyfs/config" "crazyfs/config"
"crazyfs/directorycrawler"
"crazyfs/globals" "crazyfs/globals"
"crazyfs/logging" "crazyfs/logging"
"crazyfs/sharedcache"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"path/filepath" "path/filepath"
) )
@ -18,7 +18,7 @@ func init() {
} }
// ResponseItem is what is returned by the HTTP API as a JSON object. // ResponseItem is what is returned by the HTTP API as a JSON object.
// We don't return a `CacheItem.Item` because having a separate `ResponseItem` // We don't return a `cacheitem.Item` because having a separate `responseitem`
// object allows us to customize the structure without messing with the original item. // object allows us to customize the structure without messing with the original item.
type ResponseItem struct { type ResponseItem struct {
Path string `json:"path"` Path string `json:"path"`
@ -35,24 +35,24 @@ type ResponseItem struct {
Cached int64 `json:"cached"` Cached int64 `json:"cached"`
} }
func NewResponseItem(cacheItem *CacheItem.Item) *ResponseItem { func NewResponseItem(cacheItem *cacheitem.Item) *ResponseItem {
// TODO: this should never happen and can probably be removed. // TODO: this should never happen and can probably be removed.
// Problem was linked to the scenario where an item was not found in the cache // Problem was linked to the scenario where an item was not found in the cache
// so a new crawl was triggered but the `childItem` var was never updated. // so a new crawl was triggered but the `childItem` var was never updated.
//defer func() { //defer func() {
// if r := recover(); r != nil { // if r := recover(); r != nil {
// copiedItem := &CacheItem.Item{ // copiedItem := &cacheitem.Item{
// Path: cacheItem.Path, // Path: cacheitem.Path,
// Name: cacheItem.Name, // Name: cacheitem.Name,
// Size: cacheItem.Size, // Size: cacheitem.Size,
// Extension: cacheItem.Extension, // Extension: cacheitem.Extension,
// Modified: cacheItem.Modified, // Modified: cacheitem.Modified,
// Mode: cacheItem.Mode, // Mode: cacheitem.Mode,
// IsDir: cacheItem.IsDir, // IsDir: cacheitem.IsDir,
// IsSymlink: cacheItem.IsSymlink, // IsSymlink: cacheitem.IsSymlink,
// Cached: cacheItem.Cached, // Cached: cacheitem.Cached,
// Children: nil, // Children: nil,
// MimeType: cacheItem.MimeType, // MimeType: cacheitem.MimeType,
// } // }
// log.Fatalf("Recovered from panic: %s - %+v - %s", r, copiedItem, debug.Stack()) // log.Fatalf("Recovered from panic: %s - %+v - %s", r, copiedItem, debug.Stack())
// } // }
@ -75,7 +75,7 @@ func NewResponseItem(cacheItem *CacheItem.Item) *ResponseItem {
if len(cacheItem.Children) > 0 { // avoid a null entry for the children key in the JSON. if len(cacheItem.Children) > 0 { // avoid a null entry for the children key in the JSON.
var children []*ResponseItem var children []*ResponseItem
for _, child := range cacheItem.Children { for _, child := range cacheItem.Children {
childItem, found := SharedCache.Cache.Get(child) childItem, found := sharedcache.Cache.Get(child)
if !found { if !found {
// If the path wasn't found, do a quick crawl since the path could have been modified, since the last crawl. // If the path wasn't found, do a quick crawl since the path could have been modified, since the last crawl.
@ -85,7 +85,7 @@ func NewResponseItem(cacheItem *CacheItem.Item) *ResponseItem {
crawlRelPath := filepath.Join(config.GetConfig().RootDir, child) crawlRelPath := filepath.Join(config.GetConfig().RootDir, child)
// TODO: when does this get triggered? // TODO: when does this get triggered?
log.Debugf(`NewResponseItem:Crawl - Not in cache, crawling: "%s" ("%s")`, child, crawlRelPath) log.Debugf(`NewResponseItem:Crawl - Not in cache, crawling: "%s" ("%s")`, child, crawlRelPath)
dc := DirectoryCrawler.NewDirectoryCrawler(globals.DirectoryCrawlers.Queue) dc := directorycrawler.NewDirectoryCrawler(globals.DirectoryCrawlers.Queue)
item, err := dc.CrawlNoRecursion(crawlRelPath) item, err := dc.CrawlNoRecursion(crawlRelPath)
if err != nil { if err != nil {
log.Errorf("NewResponseItem:Crawl - %s", err) log.Errorf("NewResponseItem:Crawl - %s", err)

View File

@ -1,15 +1,13 @@
package SharedCache package sharedcache
import ( import (
"crazyfs/CacheItem" "crazyfs/cacheitem"
"crazyfs/logging" "crazyfs/logging"
"errors"
lru "github.com/hashicorp/golang-lru/v2" lru "github.com/hashicorp/golang-lru/v2"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
var Cache *lru.Cache[string, *CacheItem.Item] var Cache *lru.Cache[string, *cacheitem.Item]
var cacheCreated bool
var log *logrus.Logger var log *logrus.Logger
func init() { func init() {
@ -17,14 +15,13 @@ func init() {
} }
func NewCache(size int) error { func NewCache(size int) error {
if cacheCreated { if Cache != nil {
return errors.New("cache has already been created") panic("cache has already been created")
} }
cache, err := lru.New[string, *CacheItem.Item](size) cache, err := lru.New[string, *cacheitem.Item](size)
if err != nil { if err != nil {
return err return err
} }
Cache = cache Cache = cache
cacheCreated = true
return nil return nil
} }

View File

@ -1,4 +1,4 @@
package Workers package workers
import ( import (
"crazyfs/logging" "crazyfs/logging"

View File

@ -1,26 +1,26 @@
package Workers package workers
import ( import (
"crazyfs/Walk"
"crazyfs/config" "crazyfs/config"
"crazyfs/queuedwalk"
) )
// worker.go holds the worker function for `Walk()`. // worker.go holds the worker function for `queuedwalk()`.
type CrawlWorkerFunc func(workerData *CrawlWorkers) type CrawlWorkerFunc func(workerData *CrawlWorkers)
type CrawlWorkers struct { type CrawlWorkers struct {
Queue *Walk.JobQueue Queue *queuedwalk.JobQueue
BusyWorkers int32 BusyWorkers int32
WorkerFunc CrawlWorkerFunc WorkerFunc CrawlWorkerFunc
} }
// InitializeWorkers starts the number of Workers defined by the config. // InitializeWorkers starts the number of workers defined by the config.
func InitializeWorkers(workerFunc CrawlWorkerFunc) *CrawlWorkers { func InitializeWorkers(workerFunc CrawlWorkerFunc) *CrawlWorkers {
w := &CrawlWorkers{ w := &CrawlWorkers{
WorkerFunc: workerFunc, WorkerFunc: workerFunc,
} }
w.Queue = Walk.NewJobQueue() w.Queue = queuedwalk.NewJobQueue()
for n := 1; n <= config.GetConfig().DirectoryCrawlers; n++ { for n := 1; n <= config.GetConfig().DirectoryCrawlers; n++ {
go w.WorkerFunc(w) go w.WorkerFunc(w)
} }