improve startup sequence, fix the elastic key format

This commit is contained in:
Cyberes 2023-12-13 20:09:02 -07:00
parent 0469529f54
commit 0ee56f20e7
12 changed files with 144 additions and 97 deletions

View File

@ -1,6 +1,6 @@
# crazy-file-server
*A heavy-duty web file browser for CRAZY files.*
*A heavy-duty web file browser for cRaZy files.*
The whole schtick of this program is that it caches the directory and file structures so that the server doesn't have to
re-read the disk on every request. By doing the processing upfront when the server starts along with some background

View File

@ -49,8 +49,12 @@ func HandleFileNotFound(relPath string, fullPath string, w http.ResponseWriter)
ReturnFake404Msg("path not found", w)
return nil
} else if err != nil {
log.Errorf("HandleFileNotFound - crawl failed: %s", err)
Return500Msg(w)
if os.IsNotExist(err) {
ReturnFake404Msg("path not found", w)
} else {
log.Errorf("HandleFileNotFound - crawl failed: %s", err)
Return500Msg(w)
}
return nil
}

View File

@ -151,7 +151,7 @@ func SearchFile(w http.ResponseWriter, r *http.Request) {
})
}
searchDuration := time.Since(searchStart).Round(time.Second)
searchDuration := time.Since(searchStart) // .Round(time.Second)
log.Debugf(`SEARCH - %s - Query: "%s" - Results: %d - Elapsed: %d`, logging.GetRealIP(r), queryString, len(results), searchDuration)
response := map[string]interface{}{

View File

@ -52,7 +52,7 @@ func startCrawl(wg *sync.WaitGroup, crawlerChan chan struct{}) {
func logCacheStatus(msg string, ticker *time.Ticker, logFn func(format string, args ...interface{})) {
defer ticker.Stop()
for range ticker.C {
if !config.GetConfig().ElasticsearchSyncEnable {
if !config.GetConfig().ElasticsearchSyncEnable || InitialCrawlInProgress {
logStr := "%s - %d/%d items in the cache. Busy Workers: %d. Jobs queued: %d. Running crawls: %d."
logFn(logStr,
msg, len(SharedCache.Cache.Keys()), config.GetConfig().CacheSize, Workers.BusyWorkers, Workers.Queue.GetQueueSize(), DirectoryCrawler.GetTotalActiveCrawls())

View File

@ -14,7 +14,6 @@ import (
"github.com/elastic/go-elasticsearch/v8"
"github.com/sirupsen/logrus"
"net/http"
//_ "net/http/pprof" // for profiling
"os"
"path/filepath"
"time"
@ -93,8 +92,38 @@ func main() {
log.Infof("Elasticsearch enabled: %t", cfg.ElasticsearchEnable)
if cfg.ElasticsearchSyncEnable {
elastic.InitializeWorkers()
// Start the Elastic connection so it can initialize while we're doing the initial crawl.
// If we fail to establish a connection to Elastic, don't kill the entire server. Instead, just disable Elastic.
if cfg.ElasticsearchEnable {
esCfg := elasticsearch.Config{
Addresses: []string{
cfg.ElasticsearchEndpoint,
},
APIKey: cfg.ElasticsearchAPIKey,
}
es, err := elasticsearch.NewClient(esCfg)
if err != nil {
log.Errorf("Error creating the Elasticsearch client: %s", err)
elastic.LogElasticQuit()
elastic.ElasticEnabled = false
} else {
elastic.ElasticClient = es
go func() {
// This could take a minute, so we do this in the background while we crawl.
elastic.EnableElasticsearchConnection()
for cache.InitialCrawlInProgress {
// Sleep while the initial crawl is running.
time.Sleep(1 * time.Second)
}
if !cliArgs.disableElasticSync || !cfg.ElasticsearchSyncEnable {
elastic.InitializeWorkers()
go elastic.SyncThread()
log.Info("Started the background Elasticsearch sync thread.")
} else {
log.Info("The background Elasticsearch sync thread is disabled.")
}
}()
}
}
Workers.InitializeWorkers()
@ -103,7 +132,6 @@ func main() {
// Start the webserver before doing the long crawl
r := api.NewRouter()
//log.Fatal(http.ListenAndServe(fmt.Sprintf(":%s", cfg.HTTPPort), r))
go func() {
err := http.ListenAndServe(fmt.Sprintf(":%s", cfg.HTTPPort), r)
if err != nil {
@ -126,40 +154,7 @@ func main() {
if err != nil {
log.Fatalf("Failed to start timed crawler process: %s", err)
}
log.Infoln("Started the timed crawler process")
if cfg.ElasticsearchEnable {
// If we fail to establish a connection to Elastic, don't kill the entire server.
// Instead, just disable Elastic.
esCfg := elasticsearch.Config{
Addresses: []string{
cfg.ElasticsearchEndpoint,
},
APIKey: cfg.ElasticsearchAPIKey,
}
es, err := elasticsearch.NewClient(esCfg)
if err != nil {
log.Errorf("Error creating the Elasticsearch client: %s", err)
elastic.LogElasticQuit()
cfg.ElasticsearchEnable = false
} else {
elastic.ElasticClient = es
if cfg.ElasticsearchSyncEnable && !cliArgs.disableElasticSync {
go elastic.SyncThread()
log.Info("Started the background Elasticsearch sync thread.")
} else {
log.Info("The background Elasticsearch sync thread is disabled.")
}
}
}
// For profiling
//go func() {
// log.Println(http.ListenAndServe("0.0.0.0:6060", nil))
//}()
log.Infof("Started the timed crawler process. Interval: %d", config.GetConfig().CrawlModeCrawlInterval)
select {}
}

View File

@ -5,7 +5,8 @@ import "sync"
// More or less like the other queue implementation.
type DeleteJob struct {
Key string
Key string
Path string
}
type DeleteJobQueue struct {

View File

@ -24,10 +24,10 @@ func worker() {
job := Queue.GetJob()
atomic.AddInt32(&BusyWorkers, 1)
if _, ok := SharedCache.Cache.Get(job.Key); !ok {
if _, ok := SharedCache.Cache.Get(job.Path); !ok {
// If a key in Elastic does not exist in the LRU cache, delete it from Elastic.
deleteFromElasticsearch(job.Key)
log.Debugf(`ELASTIC - Removed key "%s"`, job.Key)
log.Debugf(`ELASTIC - Removed path: "%s"`, job.Path)
}
atomic.AddInt32(&BusyWorkers, -1)

View File

@ -11,19 +11,18 @@ var Queue *DeleteJobQueue
var syncLock sync.Mutex
func SyncThread() {
createCrazyfsIndex()
var ElasticEnabled bool
// Test connection to Elastic.
esSize, err := getElasticSize()
if err != nil {
logElasticConnError(err)
func SyncThread() {
if !ElasticEnabled {
LogElasticQuit()
return
}
log.Infof(`ELASTIC - index "%s" contains %d items.`, config.GetConfig().ElasticsearchIndex, esSize)
createCrazyfsIndex()
// Run a partial sync at startup, unless configured to run a full one.
syncElasticsearch(false)
syncElasticsearch(config.GetConfig().ElasticsearchFullSyncOnStart)
ticker := time.NewTicker(time.Duration(config.GetConfig().ElasticsearchSyncInterval) * time.Second)
fullSyncTicker := time.NewTicker(time.Duration(config.GetConfig().ElasticsearchFullSyncInterval) * time.Second)
@ -41,6 +40,11 @@ func SyncThread() {
// TODO: make this use workers instead of starting a million threads
// TODO: have the workers exit when the sync job is finished
func syncElasticsearch(doFullSync bool) {
if !ElasticEnabled {
log.Errorln("ELASTIC - disabled, not syncing.")
return
}
// Only one sync at a time. Also helps to prevent races with the global variables.
syncLock.Lock()
@ -62,7 +66,7 @@ func syncElasticsearch(doFullSync bool) {
// Set global variables for the workers to read.
fullSync = doFullSync
var err error
existingKeys, err = getPathsFromIndex()
globalKeysByPath, globalPathsByKey, err = getPathsFromIndex()
if err != nil {
log.Errorf("ELASTIC - Error retrieving keys from Elasticsearch: %s", err)
return
@ -84,10 +88,21 @@ func syncElasticsearch(doFullSync bool) {
func logElasticConnError(err error) {
log.Errorf("ELASTIC - Failed to read the index: %s", err)
LogElasticQuit()
}
func EnableElasticsearchConnection() {
// Test connection to Elastic.
esSize, err := getElasticSize()
if err != nil {
logElasticConnError(err)
ElasticEnabled = false
return
}
ElasticEnabled = true
log.Infof(`ELASTIC - Connected to index "%s". Contains %d items.`, config.GetConfig().ElasticsearchIndex, esSize)
}
func LogElasticQuit() {
ElasticEnabled = false
log.Errorln("ELASTIC - background thread exiting, Elastic indexing and search will not be available.")
}

View File

@ -10,27 +10,31 @@ import (
"encoding/json"
"github.com/elastic/go-elasticsearch/v8/esapi"
"os"
"slices"
)
// existingKeys is a global variable called by the Walker callback: addToElasticsearch().
// It is set only by syncElasticsearch() when a sync is started. Only one sync can run at a time.
// A global is needed since there is no way to pass variables like this to the workers.
var existingKeys []string
var globalKeysByPath map[string]string
var globalPathsByKey map[string]string
// fullSync is another global variable accessed by the workers and set by syncElasticsearch()
var fullSync bool
func addToElasticsearch(fullPath string, info os.FileInfo, incomingErr error) error {
key := file.StripRootDir(fullPath)
cacheItem, found := SharedCache.Cache.Get(key)
if !found {
log.Fatalf(`ELASTICSEARCH - Could not fetch item "%s" from the LRU cache!`, key)
} else {
if !shouldExclude(key, config.GetConfig().ElasticsearchExcludePatterns) {
if fullSync {
preformAddToElasticsearch(cacheItem)
} else if !slices.Contains(existingKeys, key) {
relPath := file.StripRootDir(fullPath)
if !shouldExclude(relPath, config.GetConfig().ElasticsearchExcludePatterns) {
cacheItem, found := SharedCache.Cache.Get(relPath)
if !found {
log.Warnf(`ELASTICSEARCH - Could not fetch item "%s" from the LRU cache! Deleting this item from Elastic. This error can probably be ignored.`, relPath)
deleteFromElasticsearch(relPath)
} else {
if _, ok := globalPathsByKey[relPath]; ok {
// Item already exists.
if fullSync {
preformAddToElasticsearch(cacheItem)
}
} else {
preformAddToElasticsearch(cacheItem)
}
}
@ -39,20 +43,24 @@ func addToElasticsearch(fullPath string, info os.FileInfo, incomingErr error) er
}
func preformAddToElasticsearch(item *CacheItem.Item) {
log.Debugf(`ELASTIC - Adding: "%s"`, item.Path)
prepareCacheItem(item)
data, err := json.Marshal(item)
preparedItem, err := prepareCacheItem(item)
if err != nil {
log.Printf("Error marshaling item: %s", err)
log.Printf("ELASTIC - Error preparing new item: %s", err)
return
}
data, err := json.Marshal(preparedItem)
if err != nil {
log.Printf("ELASTIC - Error marshaling new item: %s", err)
return
}
req := esapi.IndexRequest{
Index: config.GetConfig().ElasticsearchIndex,
DocumentID: encodeToBase64(item.Path),
Body: bytes.NewReader(data),
Refresh: "true",
}
res, err := req.Do(context.Background(), ElasticClient)
if err != nil {
log.Errorf("ELASTIC - Error getting response: %s", err)
@ -67,15 +75,26 @@ func preformAddToElasticsearch(item *CacheItem.Item) {
}
log.Errorf(`ELASTIC - Error indexing document "%s" - Status code: %d - %s`, item.Path, res.StatusCode, e)
}
log.Debugf(`ELASTIC - Added: "%s"`, preparedItem.Path)
}
// prepareCacheItem is used to get an item ready to insert into Elastic.
func prepareCacheItem(item *CacheItem.Item) {
// We don't care about the children, and this field's length may cause issues.
item.Children = nil
// The length of this one also may cause issues.
item.Content = ""
// Don't need to return anything since `item` is a pointer.
func prepareCacheItem(item *CacheItem.Item) (*CacheItem.Item, error) {
resolvedItem := CacheItem.Item{
Path: item.Path,
Name: item.Name,
Size: item.Size,
Extension: item.Extension,
Modified: item.Modified,
Mode: item.Mode,
IsDir: item.IsDir,
IsSymlink: item.IsSymlink,
MimeType: item.MimeType,
Encoding: item.Encoding,
Children: nil, // We don't care about the children, and this field's length may cause issues.
Content: "", // The length of this one also may cause issues.
Cached: item.Cached,
}
return &resolvedItem, nil
}

View File

@ -9,7 +9,7 @@ import (
func startRemoveStaleItemsFromElasticsearch() {
// Retrieve all keys from Elasticsearch
keys, err := getPathsFromIndex()
pathsByKey, _, err := getPathsFromIndex()
if err != nil {
log.Errorf("ELASTIC - Error retrieving keys from Elasticsearch: %s", err)
return
@ -18,15 +18,15 @@ func startRemoveStaleItemsFromElasticsearch() {
log.Debugln("ELASTIC - Checking for removed items...")
// For each key in Elasticsearch, create a job to check (and remove it if the key no longer exists in the cache).
for _, key := range keys {
go Queue.AddJob(DeleteJob{Key: key})
for key, path := range pathsByKey {
go Queue.AddJob(DeleteJob{Key: key, Path: path})
}
}
func deleteFromElasticsearch(key string) {
req := esapi.DeleteRequest{
Index: config.GetConfig().ElasticsearchIndex,
DocumentID: encodeToBase64(key),
DocumentID: key,
}
res, err := req.Do(context.Background(), ElasticClient)

View File

@ -25,3 +25,12 @@ func encodeToBase64(s string) string {
// Used to encode key names to base64 since file paths aren't very Elastic-friendly.
return base64.RawURLEncoding.EncodeToString([]byte(s))
}
func decodeFromBase64(s string) (string, error) {
// Used to decode base64 encoded strings back to their original form.
data, err := base64.RawURLEncoding.DecodeString(s)
if err != nil {
return "", err
}
return string(data), nil
}

View File

@ -11,20 +11,21 @@ import (
)
func getElasticSize() (int, error) {
esContents, err := getPathsFromIndex()
keysByPath, _, err := getPathsFromIndex()
if err != nil {
return -1, nil
}
return len(esContents), nil
return len(keysByPath), nil
}
func getPathsFromIndex() ([]string, error) {
func getPathsFromIndex() (map[string]string, map[string]string, error) {
// This may take a bit if the index is very large, so avoid calling this.
// Print a debug message so the user doesn't think we're frozen.
log.Debugln("ELASTIC - Fetching indexed paths from Elasticsearch...")
var paths []string
keysByPath := make(map[string]string)
pathsByKey := make(map[string]string)
var r map[string]interface{}
res, err := ElasticClient.Search(
@ -35,13 +36,13 @@ func getPathsFromIndex() ([]string, error) {
)
if err != nil {
msg := fmt.Sprintf("Error getting response: %s", err)
return nil, errors.New(msg)
return nil, nil, errors.New(msg)
}
defer res.Body.Close()
if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
msg := fmt.Sprintf("Error parsing the response body: %s", err)
return nil, errors.New(msg)
return nil, nil, errors.New(msg)
}
for {
@ -55,10 +56,13 @@ func getPathsFromIndex() ([]string, error) {
// Iterate the document "hits" returned by API call
for _, hit := range hits {
doc := hit.(map[string]interface{})["_source"].(map[string]interface{})
hitMap := hit.(map[string]interface{})
doc := hitMap["_source"].(map[string]interface{})
path, ok := doc["path"].(string)
if ok {
paths = append(paths, path)
key := hitMap["_id"].(string)
pathsByKey[key] = path
keysByPath[path] = key
}
}
@ -66,13 +70,13 @@ func getPathsFromIndex() ([]string, error) {
res, err = ElasticClient.Scroll(ElasticClient.Scroll.WithScrollID(scrollID), ElasticClient.Scroll.WithScroll(time.Minute))
if err != nil {
msg := fmt.Sprintf("Error getting response: %s", err)
return nil, errors.New(msg)
return nil, nil, errors.New(msg)
}
defer res.Body.Close()
if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
msg := fmt.Sprintf("Error getting response: %s", err)
return nil, errors.New(msg)
return nil, nil, errors.New(msg)
}
}
@ -83,9 +87,9 @@ func getPathsFromIndex() ([]string, error) {
clearScrollResponse, err := clearScrollRequest.Do(context.Background(), ElasticClient)
if err != nil {
msg := fmt.Sprintf("Error clearing scroll: %s", err)
return nil, errors.New(msg)
return nil, nil, errors.New(msg)
}
defer clearScrollResponse.Body.Close()
return paths, nil
return pathsByKey, keysByPath, nil
}