fix download encoding, redo config passing,

This commit is contained in:
Cyberes 2023-12-11 15:29:34 -07:00
parent 4b9c1ba91a
commit 634f3eb8ea
39 changed files with 319 additions and 350 deletions

View File

@ -1,16 +1,3 @@
TODO: add a "last modified" to "sort"
in <https://chub-archive.evulid.cc/api/file/list?path=/chub.ai/characters&page=1&limit=50&sort=folders>
TODO: add an admin endpoint to fetch the last n modified files. Maybe store files update time in elasticsearch?
TODO: fix the 3 loading placeholders
TODO: <https://github.com/victorspringer/http-cache>
TODO: fix encoding on https://chub-archive.evulid.cc/api/file/download?path=/other/takeout/part1.md
TODO: fix /api/file/download when an item is in the cache but does not exist on the disk
# crazy-file-server
*A heavy-duty web file browser for CRAZY files.*
@ -28,23 +15,18 @@ files stored in a very complicated directory tree in just 5 minutes.
## Features
- Automated cache management
- Optionally fill the cache on server start, or as requests come in.
- Watch for changes or scan interval.
- Automated cache management. Fill the cache when the starts, or as requests come in.
- File browsing API.
- Download API.
- Restrict certain files and directories from the download API to prevent users from downloading your entire 100GB+
dataset.
- Frontend-agnostic design. You can have it serve a simple web interface or just act as a JSON API and serve files.
- Simple resources. The resources for the frontend aren't compiled into the binary which allows you to modify or even
replace it.
- Basic searching.
- Elasticsearch integration (to do).
- Frontend-agnostic design.
- Basic searching or Elasticsearch integration.
## Install
1. Install Go.
2. Download the binary or do `cd src && go mod tidy && go build`.
2. Download the binary or do `cd src && go mod tidy && go build`
## Use
@ -54,8 +36,7 @@ files stored in a very complicated directory tree in just 5 minutes.
By default, it looks for your config in the same directory as the executable: `./config.yml` or `./config.yaml`.
If you're using initial cache and have tons of files to scan you'll need at least 5GB of RAM and will have to wait 10 or
so minutes for it to traverse the directory structure. CrazyFS is heavily threaded so you'll want at least an 8-core
so minutes for it to traverse the directory structure. CrazyFS is heavily threaded, so you'll want at least an 8-core
machine.
The search endpoint searches through the cached files. If they aren't cached, they won't be found. Enable pre-cache at
startup to cache everything.
CrazyFS works great with an HTTP cache in front of it.

View File

@ -10,12 +10,12 @@ import (
)
func NewItem(fullPath string, info os.FileInfo) *Item {
if !strings.HasPrefix(fullPath, config.RootDir) {
if !strings.HasPrefix(fullPath, config.GetConfig().RootDir) {
// Retard check
log.Fatalf("NewItem was not passed an absolute path. The path must start with the RootDir: %s", fullPath)
}
if config.CachePrintNew {
if config.GetConfig().CachePrintNew {
log.Debugf("CACHE - new: %s", fullPath)
}
@ -31,6 +31,7 @@ func NewItem(fullPath string, info os.FileInfo) *Item {
}
var mimeType string
var encoding string
var ext string
var err error
if !info.IsDir() {
@ -40,17 +41,26 @@ func NewItem(fullPath string, info os.FileInfo) *Item {
} else {
mimePath = fullPath
}
if config.CrawlerParseMIME {
if config.GetConfig().CrawlerParseMIME {
_, mimeType, ext, err = file.GetMimeType(mimePath, true, &info)
} else {
_, mimeType, ext, err = file.GetMimeType(mimePath, false, &info)
}
if config.GetConfig().CrawlerParseEncoding {
encoding, err = file.DetectFileEncoding(fullPath)
if err != nil {
log.Warnf("ITEM - Error detecting file encoding of file %s - %v", fullPath, err)
encoding = "utf-8" // fall back to utf-8
}
}
if os.IsNotExist(err) {
log.Warnf("Path does not exist: %s", fullPath)
return nil
} else if err != nil {
log.Warnf("Error detecting MIME type: %v", err)
log.Warnf("Error detecting MIME type of file %s - %v", fullPath, err)
}
}
@ -74,7 +84,8 @@ func NewItem(fullPath string, info os.FileInfo) *Item {
IsSymlink: info.Mode()&os.ModeSymlink != 0,
Cached: time.Now().UnixNano() / int64(time.Millisecond), // Set the created time to now in milliseconds
Children: make([]string, 0),
Type: mimeTypePtr,
MimeType: mimeTypePtr,
Encoding: &encoding,
}
}
@ -87,7 +98,8 @@ type Item struct {
Mode uint32 `json:"mode"`
IsDir bool `json:"isDir"`
IsSymlink bool `json:"isSymlink"`
Type *string `json:"type"`
MimeType *string `json:"type"`
Encoding *string `json:"encoding"`
Children []string `json:"children"`
Content string `json:"content,omitempty"`
Cached int64 `json:"cached"`

View File

@ -43,7 +43,7 @@ func NewResponseItem(cacheItem *CacheItem.Item, sharedCache *lru.Cache[string, *
IsSymlink: cacheItem.IsSymlink,
Cached: cacheItem.Cached,
Children: make([]*CacheItem.Item, len(cacheItem.Children)),
Type: cacheItem.Type,
Type: cacheItem.MimeType,
}
// Grab the children from the cache and add them to this new item
@ -59,7 +59,7 @@ func NewResponseItem(cacheItem *CacheItem.Item, sharedCache *lru.Cache[string, *
log.Debugf("CRAWLER - %s not in cache, crawling", child)
dc := DirectoryCrawler.NewDirectoryCrawler(sharedCache)
item, err := dc.CrawlNoRecursion(filepath.Join(config.RootDir, child))
item, err := dc.CrawlNoRecursion(filepath.Join(config.GetConfig().RootDir, child))
if err != nil {
log.Errorf("NewResponseItem - CrawlNoRecursion - %s", err)
@ -82,7 +82,7 @@ func NewResponseItem(cacheItem *CacheItem.Item, sharedCache *lru.Cache[string, *
IsSymlink: childItem.IsSymlink,
Cached: childItem.Cached,
Children: nil,
Type: childItem.Type,
MimeType: childItem.MimeType,
}
children = append(children, copiedChildItem)
}

View File

@ -11,9 +11,9 @@ import (
"net/http"
)
func AdminCacheInfo(w http.ResponseWriter, r *http.Request, cfg *config.Config, sharedCache *lru.Cache[string, *CacheItem.Item]) {
func AdminCacheInfo(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache[string, *CacheItem.Item]) {
auth := r.URL.Query().Get("auth")
if auth == "" || auth != cfg.HttpAdminKey {
if auth == "" || auth != config.GetConfig().HttpAdminKey {
helpers.Return403Msg("access denied", w)
return
}
@ -22,7 +22,7 @@ func AdminCacheInfo(w http.ResponseWriter, r *http.Request, cfg *config.Config,
response := map[string]interface{}{
"cache_size": cacheLen,
"cache_max": cfg.CacheSize,
"cache_max": config.GetConfig().CacheSize,
"crawls_running": DirectoryCrawler.GetGlobalActiveCrawls(),
"active_workers": DirectoryCrawler.ActiveWorkers,
"busy_workers": DirectoryCrawler.ActiveWalks,

View File

@ -11,7 +11,7 @@ import (
"net/http"
)
func AdminReCache(w http.ResponseWriter, r *http.Request, cfg *config.Config, sharedCache *lru.Cache[string, *CacheItem.Item]) {
func AdminReCache(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache[string, *CacheItem.Item]) {
if r.Method != http.MethodPost {
helpers.Return400Msg("this is a POST endpoint", w)
return
@ -26,7 +26,7 @@ func AdminReCache(w http.ResponseWriter, r *http.Request, cfg *config.Config, sh
}
auth := requestBody["auth"]
if auth == "" || auth != cfg.HttpAdminKey {
if auth == "" || auth != config.GetConfig().HttpAdminKey {
helpers.Return403Msg("access denied", w)
return
}

View File

@ -5,21 +5,18 @@ import (
"crazyfs/api/helpers"
"crazyfs/config"
"crazyfs/file"
"crazyfs/logging"
"fmt"
lru "github.com/hashicorp/golang-lru/v2"
"net/http"
"strings"
)
func Download(w http.ResponseWriter, r *http.Request, cfg *config.Config, sharedCache *lru.Cache[string, *CacheItem.Item]) {
func Download(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache[string, *CacheItem.Item]) {
if helpers.CheckInitialCrawl() {
helpers.HandleRejectDuringInitialCrawl(w)
return
}
log := logging.GetLogger()
pathArg := r.URL.Query().Get("path")
if pathArg == "" {
helpers.Return400Msg("missing path", w)
@ -48,7 +45,7 @@ func Download(w http.ResponseWriter, r *http.Request, cfg *config.Config, shared
}
// Multiple files, zip them
helpers.ZipHandlerCompressMultiple(cleanPaths, w, r, cfg, sharedCache)
helpers.ZipHandlerCompressMultiple(cleanPaths, w, r, sharedCache)
return
}
@ -71,45 +68,22 @@ func Download(w http.ResponseWriter, r *http.Request, cfg *config.Config, shared
// Try to get the data from the cache
item, found := sharedCache.Get(relPath)
if !found {
item = helpers.HandleFileNotFound(relPath, fullPath, sharedCache, cfg, w)
item = helpers.HandleFileNotFound(relPath, fullPath, sharedCache, w)
}
if item == nil {
// The errors have already been handled in handleFileNotFound() so we're good to just exit
return
}
if cfg.HttpAPIDlCacheControl > 0 {
w.Header().Set("Cache-Control", fmt.Sprintf("public, max-age=%d, must-revalidate", cfg.HttpAPIDlCacheControl))
if config.GetConfig().HttpAPIDlCacheControl > 0 {
w.Header().Set("Cache-Control", fmt.Sprintf("public, max-age=%d, must-revalidate", config.GetConfig().HttpAPIDlCacheControl))
} else {
w.Header().Set("Cache-Control", "no-store")
}
if !item.IsDir {
// Get the MIME type of the file
var fileExists bool
var mimeType string
var err error
if item.Type == nil {
fileExists, mimeType, _, err = file.GetMimeType(fullPath, true, nil)
if !fileExists {
helpers.Return400Msg("file not found", w)
}
if err != nil {
log.Warnf("Error detecting MIME type: %v", err)
helpers.Return500Msg(w)
return
}
// GetMimeType() returns an empty string if it was a directory
if mimeType != "" {
// Update the CacheItem's MIME in the sharedCache
item.Type = &mimeType
sharedCache.Add(relPath, item)
}
}
// https://stackoverflow.com/a/57994289
// Only files can have inline disposition, zip archives cannot
// https://stackoverflow.com/a/57994289
contentDownload := r.URL.Query().Get("download")
var disposition string
if contentDownload != "" {
@ -119,8 +93,53 @@ func Download(w http.ResponseWriter, r *http.Request, cfg *config.Config, shared
}
w.Header().Set("Content-Disposition", fmt.Sprintf(`%s; filename="%s"`, disposition, item.Name))
w.Header().Set("Content-Type", mimeType) // Set the content type to the MIME type of the file
http.ServeFile(w, r, fullPath) // Send the file to the client
// Get the MIME type of the file
var mimeType string
var err error
if item.MimeType == nil { // only if the MIME type of this item has not been set yet
_, mimeType, _, err = file.GetMimeType(fullPath, true, nil)
if err != nil {
log.Errorf("Error detecting MIME type: %v", err)
} else if mimeType != "" {
// GetMimeType() returns an empty string if it was a directory.
// Update the CacheItem's MIME in the sharedCache.
item.MimeType = &mimeType
sharedCache.Add(relPath, item)
} else {
log.Errorf("Download.go failed to match a condition when checking a file's MIME - %s", fullPath)
helpers.Return500Msg(w)
}
} else {
mimeType = *item.MimeType
}
// Get the encoding of this file
var encoding string
encoding = "utf-8" // fall back to utf-8
if item.Encoding == nil || *item.Encoding == "" { // only if the encoding of this item has not been set yet
encoding, err = file.DetectFileEncoding(fullPath)
if err != nil {
log.Warnf("Error detecting file encoding: %v", err)
} else {
// Update the object in the cache.
item.Encoding = &encoding
}
} else {
encoding = *item.Encoding
}
if config.GetConfig().HTTPNoMimeSniffHeader {
w.Header().Set("X-Content-Type-Options", "nosniff")
mimeType = file.CastTextMimes(mimeType)
}
// If we were able to find the MIME type and the encoding of the file, set the Content-Type header.
if mimeType != "" && encoding != "" {
w.Header().Set("Content-Type", mimeType+"; charset="+encoding)
}
// Send the file to the client.
http.ServeFile(w, r, fullPath)
} else {
// Stream archive of the directory here
w.Header().Set("Content-Disposition", fmt.Sprintf(`attachment; filename="%s.zip"`, item.Name))

View File

@ -4,7 +4,6 @@ import (
"crazyfs/CacheItem"
"crazyfs/cache"
"crazyfs/cache/DirectoryCrawler"
"crazyfs/config"
"encoding/json"
lru "github.com/hashicorp/golang-lru/v2"
"net/http"
@ -12,7 +11,7 @@ import (
// TODO: show the time the initial crawl started
func HealthCheck(w http.ResponseWriter, r *http.Request, cfg *config.Config, sharedCache *lru.Cache[string, *CacheItem.Item]) {
func HealthCheck(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache[string, *CacheItem.Item]) {
//log := logging.GetLogger()
response := map[string]interface{}{}

View File

@ -12,7 +12,7 @@ import (
"strconv"
)
func ListDir(w http.ResponseWriter, r *http.Request, cfg *config.Config, sharedCache *lru.Cache[string, *CacheItem.Item]) {
func ListDir(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache[string, *CacheItem.Item]) {
if helpers.CheckInitialCrawl() {
helpers.HandleRejectDuringInitialCrawl(w)
return
@ -49,7 +49,7 @@ func ListDir(w http.ResponseWriter, r *http.Request, cfg *config.Config, sharedC
// Try to get the data from the cache
cacheItem, found := sharedCache.Get(relPath)
if !found {
cacheItem = helpers.HandleFileNotFound(relPath, fullPath, sharedCache, cfg, w)
cacheItem = helpers.HandleFileNotFound(relPath, fullPath, sharedCache, w)
}
if cacheItem == nil {
return // The errors have already been handled in handleFileNotFound() so we're good to just exit
@ -61,12 +61,12 @@ func ListDir(w http.ResponseWriter, r *http.Request, cfg *config.Config, sharedC
// Get the MIME type of the file if the 'mime' argument is present
mime := r.URL.Query().Get("mime")
if mime != "" {
if item.IsDir && !cfg.HttpAllowDirMimeParse {
if item.IsDir && !config.GetConfig().HttpAllowDirMimeParse {
helpers.Return403Msg("not allowed to analyze the mime of directories", w)
return
} else {
// Only update the mime in the cache if it hasn't been set already.
// TODO: need to make sure that when a re-crawl is triggered, the Type is set back to nil
// TODO: need to make sure that when a re-crawl is triggered, the MimeType is set back to nil
if item.Type == nil {
fileExists, mimeType, ext, err := file.GetMimeType(fullPath, true, nil)
if !fileExists {
@ -78,7 +78,7 @@ func ListDir(w http.ResponseWriter, r *http.Request, cfg *config.Config, sharedC
return
}
// Update the original cached CacheItem's MIME in the sharedCache
cacheItem.Type = &mimeType
cacheItem.MimeType = &mimeType
cacheItem.Extension = &ext
sharedCache.Add(relPath, cacheItem) // take the address of CacheItem
}

View File

@ -6,6 +6,7 @@ import (
"crazyfs/cache"
"crazyfs/config"
"crazyfs/elastic"
"crazyfs/logging"
"encoding/json"
lru "github.com/hashicorp/golang-lru/v2"
"net/http"
@ -15,7 +16,7 @@ import (
"time"
)
func SearchFile(w http.ResponseWriter, r *http.Request, cfg *config.Config, sharedCache *lru.Cache[string, *CacheItem.Item]) {
func SearchFile(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache[string, *CacheItem.Item]) {
if helpers.CheckInitialCrawl() {
helpers.HandleRejectDuringInitialCrawl(w)
return
@ -27,7 +28,7 @@ func SearchFile(w http.ResponseWriter, r *http.Request, cfg *config.Config, shar
return
}
if !cfg.ElasticsearchEnable {
if !config.GetConfig().ElasticsearchEnable {
// If we aren't using Elastic, convert the query to lowercase to reduce the complication.
queryString = strings.ToLower(queryString)
}
@ -68,11 +69,11 @@ func SearchFile(w http.ResponseWriter, r *http.Request, cfg *config.Config, shar
var results []*CacheItem.Item
results = make([]*CacheItem.Item, 0)
if cfg.ElasticsearchEnable {
if config.GetConfig().ElasticsearchEnable {
// Perform the Elasticsearch query
resp, err := elastic.Search(queryString, excludeElements, cfg)
resp, err := elastic.Search(queryString, excludeElements)
if err != nil {
log.Errorf("SEARCH - Failed to perform Elasticsearch query: %s", err)
log.Errorf(`SEARCH - Failed to perform Elasticsearch query "%s" - %s`, queryString, err)
helpers.Return500Msg(w)
return
}
@ -81,7 +82,7 @@ func SearchFile(w http.ResponseWriter, r *http.Request, cfg *config.Config, shar
var respData map[string]interface{}
err = json.NewDecoder(resp.Body).Decode(&respData)
if err != nil {
log.Errorf("SEARCH - Failed to parse Elasticsearch response: %s", err)
log.Errorf(`SEARCH - Failed to parse Elasticsearch response for query "%s" - %s`, queryString, err)
helpers.Return500Msg(w)
return
}
@ -128,7 +129,7 @@ func SearchFile(w http.ResponseWriter, r *http.Request, cfg *config.Config, shar
Mode: uint32(itemSource["mode"].(float64)),
IsDir: itemSource["isDir"].(bool),
IsSymlink: itemSource["isSymlink"].(bool),
Type: itemType,
MimeType: itemType,
Cached: int64(itemSource["cached"].(float64)),
}
items[i] = item
@ -142,7 +143,7 @@ func SearchFile(w http.ResponseWriter, r *http.Request, cfg *config.Config, shar
results = append(results, items...)
}
} else {
results = cache.SearchLRU(queryString, excludeElements, limitResults, sharedCache, cfg)
results = cache.SearchLRU(queryString, excludeElements, limitResults, sharedCache)
}
if folderSorting == "folders" {
@ -152,12 +153,14 @@ func SearchFile(w http.ResponseWriter, r *http.Request, cfg *config.Config, shar
}
searchDuration := time.Since(searchStart).Round(time.Second)
log.Infof("SEARCH - completed in %s and returned %d items", searchDuration, len(results))
log.Debugf(`SEARCH - %s - Query: "%s" - Results: %d - Elapsed: %d`, logging.GetRealIP(r), queryString, len(results), searchDuration)
w.Header().Set("Cache-Control", "no-store")
w.Header().Set("Content-Type", "application/json")
err := json.NewEncoder(w).Encode(map[string]interface{}{
"results": results,
"results": results,
"numResults": len(results),
"elapsed": searchDuration,
})
if err != nil {
log.Errorf("SEARCH - Failed to serialize JSON: %s", err)

View File

@ -22,17 +22,17 @@ import (
"strings"
)
func Thumbnail(w http.ResponseWriter, r *http.Request, cfg *config.Config, sharedCache *lru.Cache[string, *CacheItem.Item]) {
if cache.InitialCrawlInProgress && !cfg.HttpAllowDuringInitialCrawl {
func Thumbnail(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache[string, *CacheItem.Item]) {
if cache.InitialCrawlInProgress && !config.GetConfig().HttpAllowDuringInitialCrawl {
helpers.HandleRejectDuringInitialCrawl(w)
returnDummyPNG(w)
return
}
log := logging.GetLogger()
relPath := file.StripRootDir(filepath.Join(cfg.RootDir, r.URL.Query().Get("path")))
relPath := file.StripRootDir(filepath.Join(config.GetConfig().RootDir, r.URL.Query().Get("path")))
relPath = strings.TrimSuffix(relPath, "/")
fullPath := filepath.Join(cfg.RootDir, relPath)
fullPath := filepath.Join(config.GetConfig().RootDir, relPath)
// Validate args before doing any operations
width, err := getPositiveIntFromQuery(r, "width")
@ -65,7 +65,7 @@ func Thumbnail(w http.ResponseWriter, r *http.Request, cfg *config.Config, share
// Try to get the data from the cache
item, found := sharedCache.Get(relPath)
if !found {
item = helpers.HandleFileNotFound(relPath, fullPath, sharedCache, cfg, w)
item = helpers.HandleFileNotFound(relPath, fullPath, sharedCache, w)
}
if item == nil {
returnDummyPNG(w)
@ -89,7 +89,7 @@ func Thumbnail(w http.ResponseWriter, r *http.Request, cfg *config.Config, share
return
}
// Update the CacheItem's MIME in the sharedCache
item.Type = &mimeType
item.MimeType = &mimeType
item.Extension = &ext
sharedCache.Add(relPath, item)

View File

@ -4,7 +4,6 @@ import (
"crazyfs/CacheItem"
"crazyfs/cache"
"crazyfs/cache/DirectoryCrawler"
"crazyfs/config"
"encoding/json"
lru "github.com/hashicorp/golang-lru/v2"
"net/http"
@ -12,7 +11,7 @@ import (
// TODO: show the time the initial crawl started
func ClientHealthCheck(w http.ResponseWriter, r *http.Request, cfg *config.Config, sharedCache *lru.Cache[string, *CacheItem.Item]) {
func ClientHealthCheck(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache[string, *CacheItem.Item]) {
response := map[string]interface{}{}
response["scan_running"] = DirectoryCrawler.GetGlobalActiveCrawls() > 0

View File

@ -8,9 +8,9 @@ import (
"net/http"
)
func RestrictedDownloadDirectories(w http.ResponseWriter, r *http.Request, cfg *config.Config, sharedCache *lru.Cache[string, *CacheItem.Item]) {
func RestrictedDownloadDirectories(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache[string, *CacheItem.Item]) {
response := map[string]interface{}{
"restricted_download_directories": config.RestrictedDownloadPaths,
"restricted_download_directories": config.GetConfig().RestrictedDownloadPaths,
}
w.Header().Set("Content-Type", "application/json")

View File

@ -1,20 +1,21 @@
package helpers
import (
"crazyfs/logging"
"encoding/json"
"net/http"
)
func WriteErrorResponse(json_code, http_code int, msg string, w http.ResponseWriter) {
//log := logging.GetLogger()
//log.Warnln(msg)
func WriteErrorResponse(jsonCode, httpCode int, msg string, w http.ResponseWriter) {
log := logging.GetLogger()
log.Warnln(msg)
w.Header().Set("Cache-Control", "no-store")
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http_code)
w.WriteHeader(httpCode)
err := json.NewEncoder(w).Encode(map[string]interface{}{
"code": json_code,
"code": jsonCode,
"error": msg,
})
@ -24,6 +25,7 @@ func WriteErrorResponse(json_code, http_code int, msg string, w http.ResponseWri
}
func ReturnFake404Msg(msg string, w http.ResponseWriter) {
log.Fatalf(msg)
WriteErrorResponse(404, http.StatusBadRequest, msg, w)
}

View File

@ -14,7 +14,7 @@ import (
)
// HandleFileNotFound if the data is not in the cache, start a new crawler
func HandleFileNotFound(relPath string, fullPath string, sharedCache *lru.Cache[string, *CacheItem.Item], cfg *config.Config, w http.ResponseWriter) *CacheItem.Item {
func HandleFileNotFound(relPath string, fullPath string, sharedCache *lru.Cache[string, *CacheItem.Item], w http.ResponseWriter) *CacheItem.Item {
log := logging.GetLogger()
//log.Fatalf("CRAWLER - %s not in cache, crawling", fullPath)
@ -84,7 +84,7 @@ func HandleFileNotFound(relPath string, fullPath string, sharedCache *lru.Cache[
Return500Msg(w)
return nil
}
cache.CheckAndRecache(fullPath, cfg, sharedCache)
cache.CheckAndRecache(fullPath, sharedCache)
return item
}
@ -110,11 +110,11 @@ func Max(a, b int) int {
}
func CheckInitialCrawl() bool {
return cache.InitialCrawlInProgress && !config.HttpAllowDuringInitialCrawl
return cache.InitialCrawlInProgress && !config.GetConfig().HttpAllowDuringInitialCrawl
}
func CheckPathRestricted(relPath string) bool {
for _, restrictedPath := range config.RestrictedDownloadPaths {
for _, restrictedPath := range config.GetConfig().RestrictedDownloadPaths {
if restrictedPath == "" {
restrictedPath = "/"
}

View File

@ -2,7 +2,6 @@ package helpers
import (
"crazyfs/CacheItem"
"crazyfs/config"
"crazyfs/file"
lru "github.com/hashicorp/golang-lru/v2"
kzip "github.com/klauspost/compress/zip"
@ -49,7 +48,7 @@ func ZipHandlerCompress(dirPath string, w http.ResponseWriter, r *http.Request)
log.Errorf("ZIPSTREM - failed to close zipwriter: %s", err)
}
}
func ZipHandlerCompressMultiple(paths []string, w http.ResponseWriter, r *http.Request, cfg *config.Config, sharedCache *lru.Cache[string, *CacheItem.Item]) {
func ZipHandlerCompressMultiple(paths []string, w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache[string, *CacheItem.Item]) {
zipWriter := kzip.NewWriter(w)
// Walk through each file and add it to the zip
for _, fullPath := range paths {
@ -58,7 +57,7 @@ func ZipHandlerCompressMultiple(paths []string, w http.ResponseWriter, r *http.R
// Try to get the data from the cache
item, found := sharedCache.Get(relPath)
if !found {
item = HandleFileNotFound(relPath, fullPath, sharedCache, cfg, w)
item = HandleFileNotFound(relPath, fullPath, sharedCache, w)
}
if item == nil {
// The errors have already been handled in handleFileNotFound() so we're good to just exit

View File

@ -3,7 +3,6 @@ package api
import (
"crazyfs/CacheItem"
"crazyfs/api/client"
"crazyfs/config"
"crazyfs/logging"
"encoding/json"
"fmt"
@ -21,7 +20,7 @@ type Route struct {
type Routes []Route
type AppHandler func(http.ResponseWriter, *http.Request, *config.Config, *lru.Cache[string, *CacheItem.Item])
type AppHandler func(http.ResponseWriter, *http.Request, *lru.Cache[string, *CacheItem.Item])
var routes = Routes{
Route{
@ -104,7 +103,7 @@ func setHeaders(next http.Handler) http.Handler {
})
}
func NewRouter(cfg *config.Config, sharedCache *lru.Cache[string, *CacheItem.Item]) *mux.Router {
func NewRouter(sharedCache *lru.Cache[string, *CacheItem.Item]) *mux.Router {
r := mux.NewRouter().StrictSlash(true)
for _, route := range routes {
var handler http.Handler
@ -113,7 +112,7 @@ func NewRouter(cfg *config.Config, sharedCache *lru.Cache[string, *CacheItem.Ite
currentRoute := route
handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
currentRoute.HandlerFunc(w, r, cfg, sharedCache)
currentRoute.HandlerFunc(w, r, sharedCache)
})
handler = setHeaders(handler)
handler = logging.LogRequest(handler)
@ -139,7 +138,7 @@ func NewRouter(cfg *config.Config, sharedCache *lru.Cache[string, *CacheItem.Ite
}
func wrongMethod(expectedMethod string, next AppHandler) AppHandler {
return func(w http.ResponseWriter, r *http.Request, cfg *config.Config, sharedCache *lru.Cache[string, *CacheItem.Item]) {
return func(w http.ResponseWriter, r *http.Request, sharedCache *lru.Cache[string, *CacheItem.Item]) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(map[string]interface{}{

View File

@ -33,7 +33,7 @@ func (dc *DirectoryCrawler) processPath(fullPath string, info os.FileInfo) error
// This block of code ensures that the parent directory's Children field is always up-to-date with
// the current state of its subdirectories. It removes any old versions of the current directory
// from the parent's Children field and adds the new version.
if fullPath != config.RootDir {
if fullPath != config.GetConfig().RootDir {
parentDir := filepath.Dir(fullPath)
strippedParentDir := file.StripRootDir(parentDir)
parentItem, found := dc.cache.Get(strippedParentDir)

23
src/cache/crawler.go vendored
View File

@ -17,23 +17,23 @@ func init() {
log = logging.GetLogger()
}
func StartCrawler(sharedCache *lru.Cache[string, *CacheItem.Item], cfg *config.Config) error {
func StartCrawler(sharedCache *lru.Cache[string, *CacheItem.Item]) error {
var wg sync.WaitGroup
crawlerChan := make(chan struct{}, cfg.DirectoryCrawlers)
crawlerChan := make(chan struct{}, config.GetConfig().DirectoryCrawlers)
go startCrawl(cfg, sharedCache, &wg, crawlerChan)
go startCrawl(sharedCache, &wg, crawlerChan)
ticker := time.NewTicker(60 * time.Second)
go logCacheStatus("CACHE STATUS", ticker, sharedCache, cfg, log.Debugf)
go logCacheStatus("CACHE STATUS", ticker, sharedCache, log.Debugf)
return nil
}
func startCrawl(cfg *config.Config, sharedCache *lru.Cache[string, *CacheItem.Item], wg *sync.WaitGroup, crawlerChan chan struct{}) {
ticker := time.NewTicker(time.Duration(cfg.CrawlModeCrawlInterval) * time.Second)
func startCrawl(sharedCache *lru.Cache[string, *CacheItem.Item], wg *sync.WaitGroup, crawlerChan chan struct{}) {
ticker := time.NewTicker(time.Duration(config.GetConfig().CrawlModeCrawlInterval) * time.Second)
defer ticker.Stop()
time.Sleep(time.Duration(cfg.CrawlModeCrawlInterval) * time.Second)
time.Sleep(time.Duration(config.GetConfig().CrawlModeCrawlInterval) * time.Second)
for range ticker.C {
crawlerChan <- struct{}{}
@ -43,25 +43,24 @@ func startCrawl(cfg *config.Config, sharedCache *lru.Cache[string, *CacheItem.It
dc := DirectoryCrawler.NewDirectoryCrawler(sharedCache)
log.Infoln("CRAWLER - Starting a crawl...")
start := time.Now()
err := dc.Crawl(cfg.RootDir, true)
err := dc.Crawl(config.GetConfig().RootDir, true)
duration := time.Since(start).Round(time.Second)
if err != nil {
log.Warnf("CRAWLER - Crawl failed: %s", err)
} else {
log.Infof("CRAWLER - Crawl completed in %s", duration)
log.Debugf("%d/%d items in the cache.", cfg.CacheSize, len(sharedCache.Keys()))
log.Debugf("%d/%d items in the cache.", config.GetConfig().CacheSize, len(sharedCache.Keys()))
}
<-crawlerChan
}()
}
}
func logCacheStatus(msg string, ticker *time.Ticker, sharedCache *lru.Cache[string, *CacheItem.Item], cfg *config.Config, logFn func(format string, args ...interface{})) {
func logCacheStatus(msg string, ticker *time.Ticker, sharedCache *lru.Cache[string, *CacheItem.Item], logFn func(format string, args ...interface{})) {
defer ticker.Stop()
for range ticker.C {
activeWorkers := int(DirectoryCrawler.ActiveWorkers)
busyWorkers := int(DirectoryCrawler.ActiveWalks)
logFn("%s - %d/%d items in the cache. Active workers: %d Active crawls: %d", msg, len(sharedCache.Keys()), cfg.CacheSize, activeWorkers, busyWorkers)
//fmt.Println(sharedCache.Keys())
logFn("%s - %d/%d items in the cache. Active workers: %d Active crawls: %d", msg, len(sharedCache.Keys()), config.GetConfig().CacheSize, activeWorkers, busyWorkers)
}
}

View File

@ -15,18 +15,18 @@ func init() {
InitialCrawlInProgress = false
}
func InitialCrawl(sharedCache *lru.Cache[string, *CacheItem.Item], cfg *config.Config) {
func InitialCrawl(sharedCache *lru.Cache[string, *CacheItem.Item]) {
log = logging.GetLogger()
log.Infof("INITIAL CRAWL - starting the crawl for %s", config.RootDir)
log.Infof("INITIAL CRAWL - starting the crawl for %s", config.GetConfig().RootDir)
ticker := time.NewTicker(3 * time.Second)
go logCacheStatus("INITIAL CRAWL", ticker, sharedCache, cfg, log.Infof)
go logCacheStatus("INITIAL CRAWL", ticker, sharedCache, log.Infof)
InitialCrawlInProgress = true
dc := DirectoryCrawler.NewDirectoryCrawler(sharedCache)
//start := time.Now()
err := dc.Crawl(config.RootDir, true)
err := dc.Crawl(config.GetConfig().RootDir, true)
if err != nil {
log.Errorf("LIST - background recursive crawl failed: %s", err)
}

1
src/cache/missing.go vendored Normal file
View File

@ -0,0 +1 @@
package cache

View File

@ -18,9 +18,9 @@ func InitRecacheSemaphore(limit int) {
sem = make(chan struct{}, limit)
}
func CheckAndRecache(path string, cfg *config.Config, sharedCache *lru.Cache[string, *CacheItem.Item]) {
func CheckAndRecache(path string, sharedCache *lru.Cache[string, *CacheItem.Item]) {
item, found := sharedCache.Get(path)
if found && time.Now().UnixNano()/int64(time.Millisecond)-item.Cached > int64(cfg.CacheTime)*60*1000 {
if found && time.Now().UnixNano()/int64(time.Millisecond)-item.Cached > int64(config.GetConfig().CacheTime)*60*1000 {
log := logging.GetLogger()
log.Debugf("Re-caching: %s", path)
sem <- struct{}{} // acquire a token

10
src/cache/search.go vendored
View File

@ -9,7 +9,7 @@ import (
"strings"
)
func SearchLRU(queryString string, excludeElements []string, limitResults int, sharedCache *lru.Cache[string, *CacheItem.Item], cfg *config.Config) []*CacheItem.Item {
func SearchLRU(queryString string, excludeElements []string, limitResults int, sharedCache *lru.Cache[string, *CacheItem.Item]) []*CacheItem.Item {
results := make([]*CacheItem.Item, 0)
const maxGoroutines = 100
@ -20,7 +20,7 @@ func SearchLRU(queryString string, excludeElements []string, limitResults int, s
resultsChan := make(chan *CacheItem.Item, len(sharedCache.Keys()))
for _, key := range sharedCache.Keys() {
searchKey(key, queryString, excludeElements, sem, resultsChan, sharedCache, cfg)
searchKey(key, queryString, excludeElements, sem, resultsChan, sharedCache)
}
// Wait for all goroutines to finish
@ -32,7 +32,7 @@ func SearchLRU(queryString string, excludeElements []string, limitResults int, s
item := <-resultsChan
if item != nil {
results = append(results, item)
if (limitResults > 0 && len(results) == limitResults) || len(results) >= cfg.ApiSearchMaxResults {
if (limitResults > 0 && len(results) == limitResults) || len(results) >= config.GetConfig().ApiSearchMaxResults {
break
}
}
@ -41,7 +41,7 @@ func SearchLRU(queryString string, excludeElements []string, limitResults int, s
return results
}
func searchKey(key string, queryString string, excludeElements []string, sem chan struct{}, resultsChan chan *CacheItem.Item, sharedCache *lru.Cache[string, *CacheItem.Item], cfg *config.Config) {
func searchKey(key string, queryString string, excludeElements []string, sem chan struct{}, resultsChan chan *CacheItem.Item, sharedCache *lru.Cache[string, *CacheItem.Item]) {
// Acquire a token
sem <- struct{}{}
@ -87,7 +87,7 @@ func searchKey(key string, queryString string, excludeElements []string, sem cha
resultsChan <- nil
return
}
if !cfg.ApiSearchShowChildren {
if !config.GetConfig().ApiSearchShowChildren {
item.Children = nil // erase the children dict
}
resultsChan <- &item

101
src/cache/watcher.go vendored
View File

@ -1,101 +0,0 @@
package cache
import (
"crazyfs/CacheItem"
"crazyfs/cache/DirectoryCrawler"
"crazyfs/config"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/radovskyb/watcher"
"strings"
"sync"
"time"
)
func StartWatcher(basePath string, sharedCache *lru.Cache[string, *CacheItem.Item], cfg *config.Config) (*watcher.Watcher, error) {
w := watcher.New()
var wg sync.WaitGroup
crawlerChan := make(chan struct{}, cfg.DirectoryCrawlers) // limit to cfg.DirectoryCrawlers concurrent crawlers
go func() {
for {
select {
case event := <-w.Event:
// Ignore events outside of basePath
if !strings.HasPrefix(event.Path, basePath) {
if cfg.CachePrintChanges {
log.Warnf("Ignoring file outside the base path: %s", event.Path)
}
continue
}
if event.Op == watcher.Create {
if cfg.CachePrintChanges {
log.Debugf("WATCHER - File created: %s", event.Path)
}
}
if event.Op == watcher.Write {
if cfg.CachePrintChanges {
log.Debugf("WATCHER - File modified: %s", event.Path)
}
}
if event.Op == watcher.Remove {
if cfg.CachePrintChanges {
log.Debugf("WATCHER - File removed: %s", event.Path)
}
sharedCache.Remove(event.Path) // remove the entry from the cache
continue // skip the rest of the loop for this event
}
if event.Op == watcher.Rename {
if cfg.CachePrintChanges {
log.Debugf("WATCHER- File renamed: %s", event.Path)
}
sharedCache.Remove(event.Path)
continue
}
if event.Op == watcher.Chmod {
if cfg.CachePrintChanges {
log.Debugf("WATCHER - File chmod: %s", event.Path)
}
}
crawlerChan <- struct{}{} // block if there are already 4 crawlers
wg.Add(1)
go func() {
defer wg.Done()
dc := DirectoryCrawler.NewDirectoryCrawler(sharedCache)
err := dc.Crawl(event.Path, true)
if err != nil {
log.Warnf("WATCHER - Crawl failed: %s", err)
}
<-crawlerChan // release
}()
case err := <-w.Error:
log.Errorf("WATCHER - %s", err)
case <-w.Closed:
return
}
}
}()
// Watch test_folder recursively for changes.
if err := w.AddRecursive(basePath); err != nil {
log.Fatalf("WATCHER RECURSIVE): %s", err)
}
go func() {
// Start the watching process - it'll check for changes every 100ms.
if err := w.Start(time.Second * time.Duration(cfg.WatchInterval)); err != nil {
log.Fatalf("WATCHER: %s", err)
}
}()
// Print the filenames of the cache entries every 5 seconds
ticker := time.NewTicker(60 * time.Second)
go func(c *lru.Cache[string, *CacheItem.Item]) {
for range ticker.C {
keys := c.Keys()
log.Debugf("%d items in the cache.", len(keys))
}
}(sharedCache)
return w, nil
}

View File

@ -6,14 +6,15 @@ import (
"strings"
)
// The global, read-only config variable.
var cfg *Config
type Config struct {
RootDir string
HTTPPort string
WatchMode string
CrawlModeCrawlInterval int
DirectoryCrawlers int
CrawlWorkers int
WatchInterval int
CacheSize int
CacheTime int
CachePrintNew bool
@ -21,6 +22,7 @@ type Config struct {
InitialCrawl bool
CacheRecacheCrawlerLimit int
CrawlerParseMIME bool
CrawlerParseEncoding bool
HttpAPIListCacheControl int
HttpAPIDlCacheControl int
HttpAllowDirMimeParse bool
@ -42,9 +44,16 @@ type Config struct {
ElasticsearchAllowConcurrentSyncs bool
ElasticsearchFullSyncOnStart bool
ElasticsearchDefaultQueryField string
HTTPRealIPHeader string
HTTPNoMimeSniffHeader bool
}
func LoadConfig(configFile string) (*Config, error) {
func SetConfig(configFile string) (*Config, error) {
// Only allow the config to be set once.
if cfg != nil {
panic("Config has already been set!")
}
viper.SetConfigFile(configFile)
viper.SetDefault("http_port", "8080")
viper.SetDefault("watch_interval", 1)
@ -59,6 +68,7 @@ func LoadConfig(configFile string) (*Config, error) {
viper.SetDefault("initial_crawl", false)
viper.SetDefault("cache_recache_crawler_limit", 50)
viper.SetDefault("crawler_parse_mime", false)
viper.SetDefault("crawler_parse_encoding", false)
viper.SetDefault("http_api_list_cache_control", 600)
viper.SetDefault("http_api_download_cache_control", 600)
viper.SetDefault("http_allow_dir_mime_parse", true)
@ -80,6 +90,8 @@ func LoadConfig(configFile string) (*Config, error) {
viper.SetDefault("elasticsearch_full_sync_on_start", false)
viper.SetDefault("elasticsearch_query_fields", []string{"extension", "name", "path", "type", "size", "isDir"})
viper.SetDefault("elasticsearch_default_query_field", "name")
viper.SetDefault("http_real_ip_header", "X-Forwarded-For")
viper.SetDefault("http_no_mime_sniff_header", false)
err := viper.ReadInConfig()
if err != nil {
@ -109,9 +121,7 @@ func LoadConfig(configFile string) (*Config, error) {
config := &Config{
RootDir: rootDir,
HTTPPort: viper.GetString("http_port"),
WatchMode: viper.GetString("watch_mode"),
CrawlModeCrawlInterval: viper.GetInt("crawl_mode_crawl_interval"),
WatchInterval: viper.GetInt("watch_interval"),
DirectoryCrawlers: viper.GetInt("crawl_mode_crawl_interval"),
CrawlWorkers: viper.GetInt("crawl_workers"),
CacheSize: viper.GetInt("cache_size"),
@ -121,6 +131,7 @@ func LoadConfig(configFile string) (*Config, error) {
InitialCrawl: viper.GetBool("initial_crawl"),
CacheRecacheCrawlerLimit: viper.GetInt("cache_recache_crawler_limit"),
CrawlerParseMIME: viper.GetBool("crawler_parse_mime"),
CrawlerParseEncoding: viper.GetBool("crawler_parse_encoding"),
HttpAPIListCacheControl: viper.GetInt("http_api_list_cache_control"),
HttpAPIDlCacheControl: viper.GetInt("http_api_download_cache_control"),
HttpAllowDirMimeParse: viper.GetBool("http_allow_dir_mime_parse"),
@ -142,10 +153,8 @@ func LoadConfig(configFile string) (*Config, error) {
ElasticsearchAllowConcurrentSyncs: viper.GetBool("elasticsearch_allow_concurrent_syncs"),
ElasticsearchFullSyncOnStart: viper.GetBool("elasticsearch_full_sync_on_start"),
ElasticsearchDefaultQueryField: viper.GetString("elasticsearch_default_query_field"),
}
if config.WatchMode != "crawl" && config.WatchMode != "watch" {
return nil, errors.New("watch_mode must be 'crawl' or 'watch'")
HTTPRealIPHeader: viper.GetString("http_real_ip_header"),
HTTPNoMimeSniffHeader: viper.GetBool("http_no_mime_sniff_header"),
}
if config.CacheTime < 0 {
@ -188,5 +197,13 @@ func LoadConfig(configFile string) (*Config, error) {
return nil, errors.New("elasticsearch_full_sync_interval must be greater than elasticsearch_sync_interval")
}
cfg = config
return config, nil
}
func GetConfig() *Config {
if cfg == nil {
panic("Config has not been set!")
}
return cfg
}

View File

@ -1,13 +1,13 @@
package config
// Config constants
var FollowSymlinks bool
var CachePrintNew bool
var RootDir string
var CrawlerParseMIME bool
var MaxWorkers int
var HttpAllowDuringInitialCrawl bool
var RestrictedDownloadPaths []string
var ElasticsearchEnable bool
var ElasticsearchEndpoint string
var ElasticsearchSyncInterval int
//var CachePrintNew bool
//var RootDir string
//var CrawlerParseMIME bool
//var MaxWorkers int
//var HttpAllowDuringInitialCrawl bool
//var RestrictedDownloadPaths []string
//var ElasticsearchEnable bool
//var ElasticsearchEndpoint string
//var ElasticsearchSyncInterval int

View File

@ -22,7 +22,6 @@ import (
)
var log *logrus.Logger
var cfg *config.Config
type cliConfig struct {
configFile string
@ -79,7 +78,7 @@ func main() {
}
var err error
cfg, err = config.LoadConfig(cliArgs.configFile)
cfg, err := config.SetConfig(cliArgs.configFile)
if err != nil {
log.Fatalf("Failed to load config file: %s", err)
}
@ -89,30 +88,19 @@ func main() {
log.Fatal(err)
}
// Set config variables
// TODO: just pass the entire cfg object
config.FollowSymlinks = false
config.CachePrintNew = cfg.CachePrintNew
config.RootDir = cfg.RootDir
config.CrawlerParseMIME = cfg.CrawlerParseMIME
config.MaxWorkers = cfg.CrawlWorkers
config.HttpAllowDuringInitialCrawl = cfg.HttpAllowDuringInitialCrawl
DirectoryCrawler.JobQueueSize = cfg.WorkersJobQueueSize
config.RestrictedDownloadPaths = cfg.RestrictedDownloadPaths
config.ElasticsearchEnable = cfg.ElasticsearchEnable
config.ElasticsearchEndpoint = cfg.ElasticsearchEndpoint
config.ElasticsearchSyncInterval = cfg.ElasticsearchSyncInterval
// Set global variables.
config.FollowSymlinks = false // TODO: make sure this works then set it based on the config yml
log.Infof("Elasticsearch enabled: %t", cfg.ElasticsearchEnable)
// Init global variables
//DirectoryCrawler.CrawlWorkerPool = DirectoryCrawler.NewWorkerPool(config.MaxWorkers)
DirectoryCrawler.WorkerPool = make(chan struct{}, config.MaxWorkers)
DirectoryCrawler.WorkerPool = make(chan struct{}, cfg.CrawlWorkers)
cache.InitRecacheSemaphore(cfg.CacheRecacheCrawlerLimit)
// Start the webserver before doing the long crawl
r := api.NewRouter(cfg, sharedCache)
r := api.NewRouter(sharedCache)
//log.Fatal(http.ListenAndServe(fmt.Sprintf(":%s", cfg.HTTPPort), r))
go func() {
err := http.ListenAndServe(fmt.Sprintf(":%s", cfg.HTTPPort), r)
@ -125,28 +113,17 @@ func main() {
if cliArgs.initialCrawl || cfg.InitialCrawl {
log.Infoln("Preforming initial crawl...")
start := time.Now()
cache.InitialCrawl(sharedCache, cfg)
cache.InitialCrawl(sharedCache)
duration := time.Since(start).Round(time.Second)
keys := sharedCache.Keys()
log.Infof("Initial crawl completed in %s. %d items added to the cache.", duration, len(keys))
}
if cfg.WatchMode == "watch" {
log.Debugln("Starting the watcher process")
watcher, err := cache.StartWatcher(cfg.RootDir, sharedCache, cfg)
if err != nil {
log.Fatalf("Failed to start watcher process: %s", err)
}
log.Infoln("Started the watcher process")
defer watcher.Close()
} else if cfg.WatchMode == "crawl" {
//log.Debugln("Starting the crawler")
err := cache.StartCrawler(sharedCache, cfg)
if err != nil {
log.Fatalf("Failed to start timed crawler process: %s", err)
}
log.Infoln("Started the timed crawler process")
err = cache.StartCrawler(sharedCache)
if err != nil {
log.Fatalf("Failed to start timed crawler process: %s", err)
}
log.Infoln("Started the timed crawler process")
if cfg.ElasticsearchEnable {
// If we fail to establish a connection to Elastic, don't kill the entire server.
@ -167,7 +144,7 @@ func main() {
elastic.ElasticClient = es
if cfg.ElasticsearchSyncEnable && !cliArgs.disableElasticSync {
go elastic.ElasticsearchThread(sharedCache, cfg)
go elastic.ElasticsearchThread(sharedCache)
log.Info("Started the background Elasticsearch sync thread.")
} else {
log.Info("The background Elasticsearch sync thread is disabled.")

View File

@ -9,51 +9,51 @@ import (
"time"
)
func ElasticsearchThread(sharedCache *lru.Cache[string, *CacheItem.Item], cfg *config.Config) {
createCrazyfsIndex(cfg)
func ElasticsearchThread(sharedCache *lru.Cache[string, *CacheItem.Item]) {
createCrazyfsIndex()
// Test connection to Elastic.
esContents, err := getPathsFromIndex(cfg)
esContents, err := getPathsFromIndex()
if err != nil {
logElasticConnError(err)
return
}
esSize := len(esContents)
log.Infof(`ELASTIC - index "%s" contains %d items.`, cfg.ElasticsearchIndex, esSize)
log.Infof(`ELASTIC - index "%s" contains %d items.`, config.GetConfig().ElasticsearchIndex, esSize)
var wg sync.WaitGroup
sem := make(chan bool, cfg.ElasticsearchSyncThreads)
sem := make(chan bool, config.GetConfig().ElasticsearchSyncThreads)
// Run a partial sync at startup, unless configured to run a full one.
syncElasticsearch(sharedCache, cfg, &wg, sem, cfg.ElasticsearchFullSyncOnStart)
syncElasticsearch(sharedCache, &wg, sem, config.GetConfig().ElasticsearchFullSyncOnStart)
ticker := time.NewTicker(time.Duration(cfg.ElasticsearchSyncInterval) * time.Second)
fullSyncTicker := time.NewTicker(time.Duration(cfg.ElasticsearchFullSyncInterval) * time.Second)
ticker := time.NewTicker(time.Duration(config.GetConfig().ElasticsearchSyncInterval) * time.Second)
fullSyncTicker := time.NewTicker(time.Duration(config.GetConfig().ElasticsearchFullSyncInterval) * time.Second)
var mutex sync.Mutex
for {
select {
case <-ticker.C:
if !cfg.ElasticsearchAllowConcurrentSyncs {
if !config.GetConfig().ElasticsearchAllowConcurrentSyncs {
mutex.Lock()
}
syncElasticsearch(sharedCache, cfg, &wg, sem, false)
if !cfg.ElasticsearchAllowConcurrentSyncs {
syncElasticsearch(sharedCache, &wg, sem, false)
if !config.GetConfig().ElasticsearchAllowConcurrentSyncs {
mutex.Unlock()
}
case <-fullSyncTicker.C:
if !cfg.ElasticsearchAllowConcurrentSyncs {
if !config.GetConfig().ElasticsearchAllowConcurrentSyncs {
mutex.Lock()
}
syncElasticsearch(sharedCache, cfg, &wg, sem, true)
if !cfg.ElasticsearchAllowConcurrentSyncs {
syncElasticsearch(sharedCache, &wg, sem, true)
if !config.GetConfig().ElasticsearchAllowConcurrentSyncs {
mutex.Unlock()
}
}
}
}
func syncElasticsearch(sharedCache *lru.Cache[string, *CacheItem.Item], cfg *config.Config, wg *sync.WaitGroup, sem chan bool, fullSync bool) {
func syncElasticsearch(sharedCache *lru.Cache[string, *CacheItem.Item], wg *sync.WaitGroup, sem chan bool, fullSync bool) {
var syncType string
var esContents []string
if fullSync {
@ -64,7 +64,7 @@ func syncElasticsearch(sharedCache *lru.Cache[string, *CacheItem.Item], cfg *con
syncType = "refresh"
var err error
esContents, err = getPathsFromIndex(cfg)
esContents, err = getPathsFromIndex()
if err != nil {
log.Errorf("ELASTIC - Failed to read the index: %s", err)
return
@ -82,14 +82,14 @@ func syncElasticsearch(sharedCache *lru.Cache[string, *CacheItem.Item], cfg *con
if !found {
log.Fatalf(`ELASTICSEARCH - Could not fetch item "%s" from the LRU cache!`, key)
} else {
if !shouldExclude(key, cfg.ElasticsearchExcludePatterns) {
if !shouldExclude(key, config.GetConfig().ElasticsearchExcludePatterns) {
if fullSync {
addToElasticsearch(cacheItem, cfg)
addToElasticsearch(cacheItem)
} else if !slices.Contains(esContents, key) {
addToElasticsearch(cacheItem, cfg)
addToElasticsearch(cacheItem)
}
} else {
deleteFromElasticsearch(key, cfg) // clean up
deleteFromElasticsearch(key) // clean up
//log.Debugf(`ELASTIC - skipping adding "%s"`, key)
}
}
@ -99,7 +99,7 @@ func syncElasticsearch(sharedCache *lru.Cache[string, *CacheItem.Item], cfg *con
wg.Wait()
log.Debugln("ELASTIC - Checking for removed items...")
removeStaleItemsFromElasticsearch(sharedCache, cfg)
removeStaleItemsFromElasticsearch(sharedCache)
if fullSync {
ElasticRefreshSyncRunning = false

View File

@ -9,7 +9,7 @@ import (
"github.com/elastic/go-elasticsearch/v8/esapi"
)
func addToElasticsearch(item *CacheItem.Item, cfg *config.Config) {
func addToElasticsearch(item *CacheItem.Item) {
log.Debugf(`ELASTIC - Adding: "%s"`, item.Path)
prepareCacheItem(item)
data, err := json.Marshal(item)
@ -18,7 +18,7 @@ func addToElasticsearch(item *CacheItem.Item, cfg *config.Config) {
return
}
req := esapi.IndexRequest{
Index: cfg.ElasticsearchIndex,
Index: config.GetConfig().ElasticsearchIndex,
DocumentID: encodeToBase64(item.Path),
Body: bytes.NewReader(data),
Refresh: "true",

View File

@ -10,16 +10,16 @@ import (
"sync"
)
func removeStaleItemsFromElasticsearch(sharedCache *lru.Cache[string, *CacheItem.Item], cfg *config.Config) {
func removeStaleItemsFromElasticsearch(sharedCache *lru.Cache[string, *CacheItem.Item]) {
// Retrieve all keys from Elasticsearch
keys, err := getPathsFromIndex(cfg)
keys, err := getPathsFromIndex()
if err != nil {
log.Errorf("ELASTIC - Error retrieving keys from Elasticsearch: %s", err)
return
}
// Create a buffered channel as a semaphore
sem := make(chan struct{}, cfg.ElasticsearchSyncThreads)
sem := make(chan struct{}, config.GetConfig().ElasticsearchSyncThreads)
// Create a wait group to wait for all goroutines to finish
var wg sync.WaitGroup
@ -41,7 +41,7 @@ func removeStaleItemsFromElasticsearch(sharedCache *lru.Cache[string, *CacheItem
if _, ok := sharedCache.Get(key); !ok {
// If a key does not exist in the LRU cache, delete it from Elasticsearch
deleteFromElasticsearch(key, cfg)
deleteFromElasticsearch(key)
log.Debugf(`ELASTIC - Removed key "%s"`, key)
}
}(key)
@ -51,9 +51,9 @@ func removeStaleItemsFromElasticsearch(sharedCache *lru.Cache[string, *CacheItem
wg.Wait()
}
func deleteFromElasticsearch(key string, cfg *config.Config) {
func deleteFromElasticsearch(key string) {
req := esapi.DeleteRequest{
Index: cfg.ElasticsearchIndex,
Index: config.GetConfig().ElasticsearchIndex,
DocumentID: encodeToBase64(key),
}

View File

@ -4,9 +4,9 @@ import (
"crazyfs/config"
)
func createCrazyfsIndex(cfg *config.Config) {
func createCrazyfsIndex() {
// Check if index exists
res, err := ElasticClient.Indices.Exists([]string{cfg.ElasticsearchIndex})
res, err := ElasticClient.Indices.Exists([]string{config.GetConfig().ElasticsearchIndex})
if err != nil {
log.Fatalf("Error checking if index exists: %s", err)
}
@ -16,7 +16,7 @@ func createCrazyfsIndex(cfg *config.Config) {
if res.StatusCode == 401 {
log.Fatalln("ELASTIC - Failed to create a new index: got code 401.")
} else if res.StatusCode == 404 {
res, err = ElasticClient.Indices.Create(cfg.ElasticsearchIndex)
res, err = ElasticClient.Indices.Create(config.GetConfig().ElasticsearchIndex)
if err != nil {
log.Fatalf("Error creating index: %s", err)
}
@ -26,6 +26,6 @@ func createCrazyfsIndex(cfg *config.Config) {
log.Printf("Error creating index: %s", res.String())
}
log.Infof(`Created a new index named "%s"`, cfg.ElasticsearchIndex)
log.Infof(`Created a new index named "%s"`, config.GetConfig().ElasticsearchIndex)
}
}

View File

@ -10,7 +10,7 @@ import (
"time"
)
func getPathsFromIndex(cfg *config.Config) ([]string, error) {
func getPathsFromIndex() ([]string, error) {
// This may take a bit if the index is very large, so avoid calling this.
// Print a debug message so the user doesn't think we're frozen.
@ -21,7 +21,7 @@ func getPathsFromIndex(cfg *config.Config) ([]string, error) {
res, err := ElasticClient.Search(
ElasticClient.Search.WithContext(context.Background()),
ElasticClient.Search.WithIndex(cfg.ElasticsearchIndex),
ElasticClient.Search.WithIndex(config.GetConfig().ElasticsearchIndex),
ElasticClient.Search.WithScroll(time.Minute),
ElasticClient.Search.WithSize(1000),
)

View File

@ -10,9 +10,7 @@ import (
"strings"
)
func Search(query string, exclude []string, cfg *config.Config) (*esapi.Response, error) {
log.Debugf(`ELASTIC - Query: "%s"`, query)
func Search(query string, exclude []string) (*esapi.Response, error) {
var excludeQuery string
if len(exclude) > 0 {
var excludeConditions []string
@ -37,11 +35,11 @@ func Search(query string, exclude []string, cfg *config.Config) (*esapi.Response
return ElasticClient.Search(
ElasticClient.Search.WithContext(context.Background()),
ElasticClient.Search.WithIndex(cfg.ElasticsearchIndex),
ElasticClient.Search.WithIndex(config.GetConfig().ElasticsearchIndex),
ElasticClient.Search.WithBody(strings.NewReader(esQuery)),
ElasticClient.Search.WithTrackTotalHits(true),
ElasticClient.Search.WithPretty(),
ElasticClient.Search.WithSize(cfg.ApiSearchMaxResults),
ElasticClient.Search.WithSize(config.GetConfig().ApiSearchMaxResults),
)
}

36
src/file/encoding.go Normal file
View File

@ -0,0 +1,36 @@
package file
import (
"github.com/saintfish/chardet"
"os"
"strings"
)
func DetectFileEncoding(filePath string) (string, error) {
file, err := os.Open(filePath)
if err != nil {
return "", err
}
defer file.Close()
bytes, err := os.ReadFile(filePath)
if err != nil {
return "", err
}
// Detect the encoding
detector := chardet.NewTextDetector()
result, err := detector.DetectBest(bytes)
if err != nil {
return "", err
}
return result.Charset, nil
}
func CastTextMimes(mimeType string) string {
if strings.HasPrefix(mimeType, "text/") {
return "text/plain"
}
return mimeType
}

View File

@ -49,7 +49,7 @@ func GetMimeType(path string, analyze bool, passedInfo *os.FileInfo) (bool, stri
if analyze {
MIME, err = mimetype.DetectFile(path)
if err != nil {
log.Warnf("Error analyzing MIME type: %v", err)
log.Errorf("Error analyzing MIME type: %v", err)
return false, "", "", err
}
mimeType = MIME.String()
@ -66,10 +66,10 @@ func GetMimeType(path string, analyze bool, passedInfo *os.FileInfo) (bool, stri
}
func StripRootDir(path string) string {
if path == "/" || path == config.RootDir || path == "" {
if path == "/" || path == config.GetConfig().RootDir || path == "" {
// Avoid erasing our path
return "/"
} else {
return strings.TrimSuffix(strings.TrimPrefix(path, config.RootDir), "/")
return strings.TrimSuffix(strings.TrimPrefix(path, config.GetConfig().RootDir), "/")
}
}

View File

@ -10,7 +10,7 @@ import (
// SafeJoin Clean the provided path
func SafeJoin(pathArg string) (string, error) {
cleanPath := filepath.Join(config.RootDir, filepath.Clean(pathArg))
cleanPath := filepath.Join(config.GetConfig().RootDir, filepath.Clean(pathArg))
cleanPath = strings.TrimRight(cleanPath, "/")
return cleanPath, nil
}
@ -33,10 +33,10 @@ func DetectTraversal(pathArg string) (bool, error) {
}
cleanArg := filepath.Clean(pathArg)
cleanPath := filepath.Join(config.RootDir, cleanArg)
cleanPath := filepath.Join(config.GetConfig().RootDir, cleanArg)
// If the path is not within the base path, return an error
if !strings.HasPrefix(cleanPath, config.RootDir) {
if !strings.HasPrefix(cleanPath, config.GetConfig().RootDir) {
return true, fmt.Errorf("the full path is outside the root dir: %s", pathArg)
}

View File

@ -13,7 +13,7 @@ require (
github.com/klauspost/compress v1.16.7
github.com/mitchellh/mapstructure v1.5.0
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646
github.com/radovskyb/watcher v1.0.7
github.com/saintfish/chardet v0.0.0-20230101081208-5e3ef4b5456d
github.com/sirupsen/logrus v1.9.3
github.com/spf13/viper v1.16.0
)

View File

@ -164,10 +164,10 @@ github.com/pkg/sftp v1.13.1/go.mod h1:3HaPG6Dq1ILlpPZRO0HVMrsydcdLt6HRDccSgb87qR
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/radovskyb/watcher v1.0.7 h1:AYePLih6dpmS32vlHfhCeli8127LzkIgwJGcwwe8tUE=
github.com/radovskyb/watcher v1.0.7/go.mod h1:78okwvY5wPdzcb1UYnip1pvrZNIVEIh/Cm+ZuvsUYIg=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
github.com/saintfish/chardet v0.0.0-20230101081208-5e3ef4b5456d h1:hrujxIzL1woJ7AwssoOcM/tq5JjjG2yYOc8odClEiXA=
github.com/saintfish/chardet v0.0.0-20230101081208-5e3ef4b5456d/go.mod h1:uugorj2VCxiV1x+LzaIdVa9b4S4qGAcH6cbhh4qVxOU=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/spf13/afero v1.9.5 h1:stMpOSZFs//0Lv29HduCmli3GUfpFoF3Y1Q/aXj/wVM=

View File

@ -1,8 +1,10 @@
package logging
import (
"crazyfs/config"
"net"
"net/http"
"strings"
)
type statusWriter struct {
@ -15,13 +17,32 @@ func (sw *statusWriter) WriteHeader(status int) {
sw.ResponseWriter.WriteHeader(status)
}
// TODO: handle the proxy http headers
func GetRealIP(r *http.Request) string {
ip, _, _ := net.SplitHostPort(r.RemoteAddr) // Get the IP address without port number
// Check if the request was forwarded by a proxy
var forwarded string
if config.GetConfig().HTTPRealIPHeader == "X-Forwarded-For" {
// The X-Forwarded-For header can contain multiple IPs, use the first one
if forwarded = r.Header.Get(config.GetConfig().HTTPRealIPHeader); forwarded != "" {
split := strings.Split(forwarded, ",")
ip = strings.TrimSpace(split[0])
}
} else {
// Or just use the header the user specified.
forwarded = r.Header.Get(config.GetConfig().HTTPRealIPHeader)
}
return ip
}
func LogRequest(handler http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
sw := statusWriter{ResponseWriter: w, status: http.StatusOK} // set default status
handler.ServeHTTP(&sw, r)
ip, _, _ := net.SplitHostPort(r.RemoteAddr) // Get the IP address without port number
ip := GetRealIP(r)
log.Infof("%s - %d - %s from %s", r.Method, sw.status, r.URL.RequestURI(), ip)
})
}

View File

@ -1,5 +1,13 @@
- Track active crawls and list them on the admin page
- Limit to one on-demand crawl per path. Don't start another if one is already running. See HandleFileNotFound()
- Add config value to limit the number of on-demand crawls
- Add config value to limit the number of concurrent crawls, other crawls get queued.
- add an admin endpoint to fetch the last n modified files.
- fix /api/file/download when an item is in the cache but does not exist on the disk
- Is using scroll for the Elastic query really the best way to do a real-time query?
Later:
- Add a wildcard option to restricted_download_paths to block all sub-directories
- Add a dict to each restricted_download_paths item to specify how many levels recursive the block should be applied
- Add an endpoint to return restricted_download_paths so the frontend can block downloads for those folders
- Load the config into a global variable and stop passing it as function args
- Remove the file change watcher mode
- add a "last modified" to "sort" https://chub-archive.evulid.cc/api/file/list?path=/chub.ai/characters&page=1&limit=50&sort=folders