improve import worker, add queue deleting

This commit is contained in:
Cyberes 2024-06-11 16:06:39 -06:00
parent feed8e5d15
commit 69b13e4eb6
11 changed files with 135 additions and 52 deletions

View File

@ -1,8 +1,14 @@
import logging
import threading
import time
from geo_lib.daemon.database.connection import Database
from geo_lib.daemon.workers.importer import import_worker
from geo_lib.redis import flush_redis
logging.basicConfig(level=logging.INFO) # TODO: config level
flush_redis()
# TODO: config
Database.initialise(minconn=1, maxconn=100, host='h.postgres.nb', database='geobackend', user='geobackend', password='juu1waigu1pookee1ohcierahMoofie3')

View File

@ -10,13 +10,4 @@ class ImportQueue(models.Model):
raw_kml = models.TextField()
raw_kml_hash = models.CharField(max_length=64, unique=True)
data = models.JSONField(default=dict)
worker_lock = models.TextField(default=None, null=True)
timestamp = models.DateTimeField(auto_now_add=True)
class GeoLogs(models.Model):
id = models.AutoField(primary_key=True)
user = models.ForeignKey(get_user_model(), on_delete=models.CASCADE)
text = models.JSONField()
source = models.TextField()
timestamp = models.DateTimeField(auto_now_add=True)

View File

@ -1,9 +1,10 @@
from django.urls import path
from data.views.import_item import upload_item, fetch_import_queue, fetch_queued
from data.views.import_item import upload_item, fetch_import_queue, fetch_queued, delete_import_queue
urlpatterns = [
path('item/import/upload/', upload_item, name='upload_file'),
path('item/import/get/<int:id>', fetch_import_queue, name='fetch_import_queue'),
path('item/import/get/mine', fetch_queued, name='fetch_queued'),
path('item/import/delete/<int:id>', delete_import_queue, name='delete_import_queue'),
]

View File

@ -81,12 +81,24 @@ def fetch_queued(request):
user_items = ImportQueue.objects.filter(user=request.user).values('id', 'geojson', 'original_filename', 'raw_kml_hash', 'data', 'timestamp')
data = json.loads(json.dumps(list(user_items), cls=DjangoJSONEncoder))
for i, item in enumerate(data):
count = len(item['geojson']['features'])
count = len(item['geojson'].get('features', []))
del item['geojson']
item['feature_count'] = count
return JsonResponse({'data': data})
@login_required_401
def delete_import_queue(request, id):
if request.method == 'DELETE':
try:
queue = ImportQueue.objects.get(id=id)
except ImportQueue.DoesNotExist:
return JsonResponse({'success': False, 'msg': 'ID does not exist', 'code': 404}, status=400)
queue.delete()
return JsonResponse({'success': True})
return HttpResponse(status=405)
def _hash_kml(b: str):
if not isinstance(b, bytes):
b = b.encode()

View File

@ -0,0 +1,34 @@
import threading
import redis
from redis.exceptions import LockError
class DBLockManager:
_redis = redis.Redis(host='localhost', port=6379, db=0)
locks = {}
locks_lock = threading.Lock()
def __init__(self, worker_id):
self.worker_id = worker_id
def lock_row(self, table_name: str, primary_key):
lock = self._redis.lock(f'database_lock_{table_name}:{primary_key}')
if lock.acquire(blocking=False):
with self.locks_lock:
self.locks[f'{table_name}:{primary_key}'] = lock
return True
else:
return False
def unlock_row(self, table_name: str, primary_key):
with self.locks_lock:
lock = self.locks.get(f'{table_name}:{primary_key}')
if lock:
try:
lock.release()
return True
except LockError:
return False
else:
return False

View File

@ -1,4 +1,5 @@
import json
import logging
import time
import traceback
from uuid import uuid4
@ -6,44 +7,58 @@ from uuid import uuid4
from psycopg2.extras import RealDictCursor
from geo_lib.daemon.database.connection import CursorFromConnectionFromPool
from geo_lib.logging.database import log_to_db, DatabaseLogLevel
from geo_lib.daemon.database.locking import DBLockManager
from geo_lib.logging.database import log_to_db, DatabaseLogLevel, DatabaseLogSource
from geo_lib.spatial.kml import kml_to_geojson
from geo_lib.time import get_time_ms
_SQL_GET_UNPROCESSED_ITEMS = "SELECT * FROM public.data_importqueue ORDER BY id ASC" # coordinates
_SQL_GET_UNPROCESSED_ITEMS = "SELECT * FROM public.data_importqueue WHERE geojson = '{}' ORDER BY id ASC"
_SQL_INSERT_PROCESSED_ITEM = "UPDATE public.data_importqueue SET geojson = %s WHERE id = %s"
_SQL_DELETE_ITEM = "DELETE FROM public.data_importqueue WHERE id = %s"
_logger = logging.getLogger("DAEMON").getChild("IMPORTER")
# TODO: support multiple workers
def import_worker():
worker_id = str(uuid4())
lock_manager = DBLockManager(worker_id=worker_id)
while True:
queue = []
with CursorFromConnectionFromPool(cursor_factory=RealDictCursor) as cursor:
cursor.execute(_SQL_GET_UNPROCESSED_ITEMS)
import_queue_items = cursor.fetchall()
for item in import_queue_items:
if lock_manager.lock_row('data_importqueue', item['id']):
queue.append(item)
if len(import_queue_items):
print(f'IMPORT: processing {len(import_queue_items)} items') # TODO: logging, also log worker ID
if len(queue):
_logger.info(f'processing {len(import_queue_items)} items -- {worker_id}')
for item in import_queue_items:
for item in queue:
start = get_time_ms()
success = False
try:
geojson_data, messages = kml_to_geojson(item['raw_kml'])
success = True
except Exception as e:
err_name = e.__class__.__name__
err_msg = str(e)
if hasattr(e, 'message'):
err_msg = e.message
msg = f'Failed to import item #{item["id"]}, encountered {err_name}. {err_msg}'
log_to_db(msg, level=DatabaseLogLevel.ERROR, user_id=item['user_id'])
msg = f'Failed to import item #{item["id"]} "{item["original_filename"]}", encountered {err_name}. {err_msg}'
log_to_db(msg, level=DatabaseLogLevel.ERROR, user_id=item['user_id'], source=DatabaseLogSource.IMPORT)
traceback.print_exc()
continue
with CursorFromConnectionFromPool(cursor_factory=RealDictCursor) as cursor:
cursor.execute(_SQL_INSERT_PROCESSED_ITEM, (json.dumps(geojson_data, sort_keys=True), item['id']))
print(f'IMPORT: processed #{item["id"]} in {round((get_time_ms() - start) / 1000, 2)} seconds') # TODO: logging, also log worker ID
with CursorFromConnectionFromPool(cursor_factory=RealDictCursor) as cursor:
cursor.execute(_SQL_DELETE_ITEM, (item['id'],))
if success:
with CursorFromConnectionFromPool(cursor_factory=RealDictCursor) as cursor:
cursor.execute(_SQL_INSERT_PROCESSED_ITEM, (json.dumps(geojson_data, sort_keys=True), item['id']))
_logger.info(f'IMPORT: processed #{item["id"]} in {round((get_time_ms() - start) / 1000, 2)} seconds -- {worker_id}')
lock_manager.unlock_row('data_importqueue', item['id'])
if not len(import_queue_items):
if not len(queue):
# Only sleep if there were no items last time we checked.
time.sleep(5)

View File

@ -6,6 +6,8 @@ from pydantic import BaseModel
from geo_lib.daemon.database.connection import CursorFromConnectionFromPool
_logger = logging.getLogger("MAIN").getChild("DBLOG")
class DatabaseLogLevel(Enum):
DEBUG = logging.DEBUG
@ -15,18 +17,20 @@ class DatabaseLogLevel(Enum):
CRITICAL = logging.CRITICAL
class DatabaseLogSource(Enum):
IMPORT = "import"
class DatabaseLogItem(BaseModel):
# Using an object so we can add arbitrary data if nessesary.
msg: str
level: DatabaseLogLevel
_SQL_INSERT_ITEM = 'INSERT INTO public.data_geologs (user_id, text, source) VALUES (%s, %s, %s)'
_SQL_INSERT_LOG_ITEM = "INSERT INTO public.geologs (user_id, level, text, source) VALUES (%s, %s, %s, %s)"
def log_to_db(msg: str, level: DatabaseLogLevel, user_id: int, source: str):
print(msg)
return
# TODO:
def log_to_db(msg: str, level: DatabaseLogLevel, user_id: int, source: DatabaseLogSource):
_logger.log(level.value, msg)
with CursorFromConnectionFromPool(cursor_factory=RealDictCursor) as cursor:
cursor.execute(_SQL_INSERT_ITEM, (user_id, DatabaseLogItem(msg=msg, level=level).json()))
cursor.execute(_SQL_INSERT_LOG_ITEM, (user_id, level.value, msg, source.value))

View File

@ -0,0 +1,6 @@
import redis
def flush_redis():
r = redis.Redis(host='localhost', port=6379, db=0)
r.flushall()

View File

@ -5,4 +5,7 @@ lxml==5.2.2
kml2geojson==5.1.0
dateparser==1.2.0
geojson==3.1.0
pydantic==2.7.3
pydantic==2.7.3
sqlalchemy==2.0.30
redis==5.0.5
async_timeout==4.0.3

View File

@ -9,7 +9,7 @@
<div class="relative w-[90%] m-auto">
<div>
<input :disabled="disableUpload" type="file" @change="onFileChange">
<input id="uploadInput" :disabled="disableUpload" type="file" @change="onFileChange">
<button :disabled="disableUpload" @click="upload">Upload</button>
</div>
</div>
@ -28,7 +28,9 @@
<tr v-for="(item, index) in processQueue" :key="`item-${index}`">
<td><a :href="`/#/import/process/${item.id}`">{{ item.original_filename }}</a></td>
<td>{{ item.feature_count }}</td>
<td>button to delete from queue</td>
<td>
<button @click="deleteItem(item.id)">Delete</button>
</td>
</tr>
</tbody>
</table>
@ -61,28 +63,33 @@ export default {
this.file = e.target.files[0]
const fileType = this.file.name.split('.').pop().toLowerCase()
if (fileType !== 'kmz' && fileType !== 'kml') {
alert('Invalid file type. Only KMZ and KML files are allowed.')
e.target.value = '' // Reset the input value
alert('Invalid file type. Only KMZ and KML files are allowed.') // TODO: have this be a message on the page?
e.target.value = "" // Reset the input value
}
},
upload() {
async upload() {
this.uploadMsg = ""
if (this.file == null) {
return
}
let formData = new FormData()
formData.append('file', this.file)
axios.post('/api/data/item/import/upload/', formData, {
headers: {
'Content-Type': 'multipart/form-data',
'X-CSRFToken': this.userInfo.csrftoken
}
}).then(response => {
try {
const response = await axios.post('/api/data/item/import/upload/', formData, {
headers: {
'Content-Type': 'multipart/form-data',
'X-CSRFToken': this.userInfo.csrftoken
}
})
this.uploadMsg = `<p>${capitalizeFirstLetter(response.data.msg).trim(".")}.</p><p><a href="/#/import/process/${response.data.id}">Continue to Import</a>`
this.disableUpload = true
}).catch(error => {
await this.fetchQueueList()
this.file = null
document.getElementById("uploadInput").value = ""
this.disableUpload = false
} catch (error) {
this.handleError(error)
})
}
},
handleError(error) {
console.error("Upload failed:", error)
@ -93,6 +100,18 @@ export default {
async fetchQueueList() {
const response = await axios.get('/api/data/item/import/get/mine')
this.processQueue = response.data.data
},
async deleteItem(id) {
try {
const response = await axios.delete('/api/data/item/import/delete/' + id, {
headers: {
'X-CSRFToken': this.userInfo.csrftoken
}
})
await this.fetchQueueList()
} catch (error) {
alert(`Failed to delete ${id}: ${error.message}`)
}
}
},
async created() {
@ -112,13 +131,4 @@ export default {
</script>
<style scoped>
.overlay {
position: absolute;
top: 0;
left: 0;
width: 100%;
height: 100%;
background-color: rgba(0, 0, 0, 0.5);
z-index: 9999;
}
</style>