fix wrong key being used for delete
This commit is contained in:
parent
39513ffc36
commit
3af85db036
|
@ -31,7 +31,7 @@ func AdminCacheInfo(w http.ResponseWriter, r *http.Request) {
|
||||||
"cacheMax": config.GetConfig().CacheSize,
|
"cacheMax": config.GetConfig().CacheSize,
|
||||||
"recacheCrawlLimit": config.GetConfig().CacheRecacheCrawlerLimit,
|
"recacheCrawlLimit": config.GetConfig().CacheRecacheCrawlerLimit,
|
||||||
"newSyncRunning": elastic.FullSyncRunning,
|
"newSyncRunning": elastic.FullSyncRunning,
|
||||||
"refreshSyncRunning": elastic.FullSyncRunning,
|
"refreshSyncRunning": elastic.RefreshSyncRunning,
|
||||||
}
|
}
|
||||||
w.Header().Set("Cache-Control", "no-store")
|
w.Header().Set("Cache-Control", "no-store")
|
||||||
w.Header().Set("Content-Type", "application/json")
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
|
|
@ -26,8 +26,12 @@ func worker() {
|
||||||
|
|
||||||
if _, ok := SharedCache.Cache.Get(job.Path); !ok {
|
if _, ok := SharedCache.Cache.Get(job.Path); !ok {
|
||||||
// If a key in Elastic does not exist in the LRU cache, delete it from Elastic.
|
// If a key in Elastic does not exist in the LRU cache, delete it from Elastic.
|
||||||
deleteFromElasticsearch(job.Key)
|
err := deleteFromElasticsearch(job.Key)
|
||||||
log.Debugf(`ELASTIC - Removed path: "%s"`, job.Path)
|
if err != nil {
|
||||||
|
log.Errorf(`ELASTIC - Error deleting key "%s" - %s`, job.Key, err)
|
||||||
|
} else {
|
||||||
|
log.Debugf(`ELASTIC - Deleted path: "%s"`, job.Path)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
atomic.AddInt32(&BusyWorkers, -1)
|
atomic.AddInt32(&BusyWorkers, -1)
|
||||||
|
|
|
@ -27,7 +27,7 @@ func addToElasticsearch(fullPath string, info os.FileInfo, incomingErr error) er
|
||||||
cacheItem, found := SharedCache.Cache.Get(relPath)
|
cacheItem, found := SharedCache.Cache.Get(relPath)
|
||||||
if !found {
|
if !found {
|
||||||
log.Warnf(`ELASTICSEARCH - Could not fetch item "%s" from the LRU cache! Deleting this item from Elastic. This error can probably be ignored.`, relPath)
|
log.Warnf(`ELASTICSEARCH - Could not fetch item "%s" from the LRU cache! Deleting this item from Elastic. This error can probably be ignored.`, relPath)
|
||||||
deleteFromElasticsearch(relPath)
|
deleteFromElasticsearch(encodeToBase64(relPath))
|
||||||
} else {
|
} else {
|
||||||
if _, ok := globalPathsByKey[relPath]; ok {
|
if _, ok := globalPathsByKey[relPath]; ok {
|
||||||
// Item already exists.
|
// Item already exists.
|
||||||
|
|
|
@ -4,19 +4,23 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"crazyfs/config"
|
"crazyfs/config"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
"github.com/elastic/go-elasticsearch/v8/esapi"
|
"github.com/elastic/go-elasticsearch/v8/esapi"
|
||||||
)
|
)
|
||||||
|
|
||||||
func startRemoveStaleItemsFromElasticsearch(pathsByKey map[string]string) {
|
func startRemoveStaleItemsFromElasticsearch(pathsByKey map[string]string) {
|
||||||
log.Debugln("ELASTIC - Checking for removed items...")
|
log.Debugln("ELASTIC - Checking for removed items...")
|
||||||
|
|
||||||
|
// TODO: use waitgroups here so we know when all the jobs are done and we can erase globalKeysByPath and globalPathsByKey
|
||||||
|
|
||||||
// For each key in Elasticsearch, create a job to check (and remove it if the key no longer exists in the cache).
|
// For each key in Elasticsearch, create a job to check (and remove it if the key no longer exists in the cache).
|
||||||
for key, path := range pathsByKey {
|
for path, key := range pathsByKey {
|
||||||
Queue.AddJob(DeleteJob{Key: key, Path: path})
|
Queue.AddJob(DeleteJob{Key: key, Path: path})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func deleteFromElasticsearch(key string) {
|
func deleteFromElasticsearch(key string) error {
|
||||||
req := esapi.DeleteRequest{
|
req := esapi.DeleteRequest{
|
||||||
Index: config.GetConfig().ElasticsearchIndex,
|
Index: config.GetConfig().ElasticsearchIndex,
|
||||||
DocumentID: key,
|
DocumentID: key,
|
||||||
|
@ -24,16 +28,19 @@ func deleteFromElasticsearch(key string) {
|
||||||
|
|
||||||
res, err := req.Do(context.Background(), ElasticClient)
|
res, err := req.Do(context.Background(), ElasticClient)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return err
|
||||||
}
|
}
|
||||||
defer res.Body.Close()
|
defer res.Body.Close()
|
||||||
|
|
||||||
// If we tried to delete a key that doesn't exist in Elastic, it will return an error that we will ignore.
|
// If we tried to delete a key that doesn't exist in Elastic, it will return an error.
|
||||||
if res.IsError() && res.StatusCode != 404 {
|
if res.IsError() && res.StatusCode != 404 {
|
||||||
var e map[string]interface{}
|
var e map[string]interface{}
|
||||||
if err := json.NewDecoder(res.Body).Decode(&e); err != nil {
|
if err := json.NewDecoder(res.Body).Decode(&e); err != nil {
|
||||||
log.Printf("Error parsing the response body: %s", err)
|
text := fmt.Sprintf("failed to parse the response body: %s", err)
|
||||||
|
return errors.New(text)
|
||||||
}
|
}
|
||||||
log.Errorf(`ELASTIC - Error deleting document "%s" - Status code: %d - %s`, key, res.StatusCode, e)
|
text := fmt.Sprintf(`Status code: %d - %s`, res.StatusCode, e)
|
||||||
|
return errors.New(text)
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue