basic automated import tagging

This commit is contained in:
Cyberes 2024-07-07 14:42:09 -06:00
parent 93e2ef61fc
commit 0338595141
12 changed files with 96 additions and 17 deletions

View File

@ -0,0 +1,3 @@
Test Accounts:
admin:hei8iWae

View File

@ -1,4 +1,5 @@
import logging import logging
import sys
import threading import threading
import time import time
@ -7,6 +8,7 @@ from geo_lib.daemon.workers.importer import import_worker
from geo_lib.redis import flush_redis from geo_lib.redis import flush_redis
logging.basicConfig(level=logging.INFO) # TODO: config level logging.basicConfig(level=logging.INFO) # TODO: config level
_logger = logging.getLogger("DAEMON")
flush_redis() flush_redis()
@ -15,6 +17,11 @@ Database.initialise(minconn=1, maxconn=100, host='h.postgres.nb', database='geob
import_thread = threading.Thread(target=import_worker) import_thread = threading.Thread(target=import_worker)
import_thread.start() import_thread.start()
_logger.info('Started importer')
while True: while True:
time.sleep(3600) try:
time.sleep(3600)
except KeyboardInterrupt:
# TODO: shut down workers
sys.exit(0)

View File

@ -5,7 +5,7 @@ from django.db import models
class ImportQueue(models.Model): class ImportQueue(models.Model):
id = models.AutoField(primary_key=True) id = models.AutoField(primary_key=True)
user = models.ForeignKey(get_user_model(), on_delete=models.CASCADE) user = models.ForeignKey(get_user_model(), on_delete=models.CASCADE)
geojson = models.JSONField(default=dict) geofeatures = models.JSONField(default=dict)
original_filename = models.TextField() original_filename = models.TextField()
raw_kml = models.TextField() raw_kml = models.TextField()
raw_kml_hash = models.CharField(max_length=64, unique=True) raw_kml_hash = models.CharField(max_length=64, unique=True)

View File

@ -70,21 +70,21 @@ def fetch_import_queue(request, id):
if queue.user_id != request.user.id: if queue.user_id != request.user.id:
return JsonResponse({'success': False, 'msg': 'not authorized to view this item', 'code': 403}, status=400) return JsonResponse({'success': False, 'msg': 'not authorized to view this item', 'code': 403}, status=400)
if len(queue.geojson): if len(queue.geojson):
return JsonResponse({'success': True, 'geojson': queue.geojson}, status=200) return JsonResponse({'success': True, 'geofeatures': queue.geojson}, status=200)
return JsonResponse({'success': True, 'geojson': {}, 'msg': 'uploaded data still processing'}, status=200) return JsonResponse({'success': True, 'geofeatures': {}, 'msg': 'uploaded data still processing'}, status=200)
except ImportQueue.DoesNotExist: except ImportQueue.DoesNotExist:
return JsonResponse({'success': False, 'msg': 'ID does not exist', 'code': 404}, status=400) return JsonResponse({'success': False, 'msg': 'ID does not exist', 'code': 404}, status=400)
@login_required_401 @login_required_401
def fetch_queued(request): def fetch_queued(request):
user_items = ImportQueue.objects.filter(user=request.user).values('id', 'geojson', 'original_filename', 'raw_kml_hash', 'data', 'timestamp') user_items = ImportQueue.objects.filter(user=request.user).values('id', 'geofeatures', 'original_filename', 'raw_kml_hash', 'data', 'timestamp')
data = json.loads(json.dumps(list(user_items), cls=DjangoJSONEncoder)) data = json.loads(json.dumps(list(user_items), cls=DjangoJSONEncoder))
for i, item in enumerate(data): for i, item in enumerate(data):
count = len(item['geojson'].get('features', [])) count = len(item['geofeatures'])
item['processing'] = len(item['geojson']) == 0 item['processing'] = len(item['geofeatures']) == 0
item['feature_count'] = count item['feature_count'] = count
del item['geojson'] del item['geofeatures']
return JsonResponse({'data': data}) return JsonResponse({'data': data})

2
src/geo-backend/dev.sh Executable file
View File

@ -0,0 +1,2 @@
#!/bin/bash
./manage.py runserver

View File

@ -8,12 +8,14 @@ from psycopg2.extras import RealDictCursor
from geo_lib.daemon.database.connection import CursorFromConnectionFromPool from geo_lib.daemon.database.connection import CursorFromConnectionFromPool
from geo_lib.daemon.database.locking import DBLockManager from geo_lib.daemon.database.locking import DBLockManager
from geo_lib.daemon.workers.workers_lib.importer.tagging import generate_auto_tags
from geo_lib.logging.database import log_to_db, DatabaseLogLevel, DatabaseLogSource from geo_lib.logging.database import log_to_db, DatabaseLogLevel, DatabaseLogSource
from geo_lib.spatial.kml import kml_to_geojson from geo_lib.spatial.kml import kml_to_geojson
from geo_lib.time import get_time_ms from geo_lib.time import get_time_ms
from geo_lib.types.item import geojson_to_geofeature
_SQL_GET_UNPROCESSED_ITEMS = "SELECT * FROM public.data_importqueue WHERE geojson = '{}' ORDER BY id ASC" _SQL_GET_UNPROCESSED_ITEMS = "SELECT * FROM public.data_importqueue WHERE geofeatures = '{}' ORDER BY id ASC"
_SQL_INSERT_PROCESSED_ITEM = "UPDATE public.data_importqueue SET geojson = %s WHERE id = %s" _SQL_INSERT_PROCESSED_ITEM = "UPDATE public.data_importqueue SET geofeatures = %s WHERE id = %s"
_SQL_DELETE_ITEM = "DELETE FROM public.data_importqueue WHERE id = %s" _SQL_DELETE_ITEM = "DELETE FROM public.data_importqueue WHERE id = %s"
_logger = logging.getLogger("DAEMON").getChild("IMPORTER") _logger = logging.getLogger("DAEMON").getChild("IMPORTER")
@ -39,8 +41,12 @@ def import_worker():
for item in queue: for item in queue:
start = get_time_ms() start = get_time_ms()
success = False success = False
geofetures = []
try: try:
geojson_data, messages = kml_to_geojson(item['raw_kml']) geojson_data, messages = kml_to_geojson(item['raw_kml'])
geofetures = geojson_to_geofeature(geojson_data)
for feature in geofetures:
feature.tags = generate_auto_tags(feature)
success = True success = True
except Exception as e: except Exception as e:
err_name = e.__class__.__name__ err_name = e.__class__.__name__
@ -51,15 +57,15 @@ def import_worker():
log_to_db(msg, level=DatabaseLogLevel.ERROR, user_id=item['user_id'], source=DatabaseLogSource.IMPORT) log_to_db(msg, level=DatabaseLogLevel.ERROR, user_id=item['user_id'], source=DatabaseLogSource.IMPORT)
traceback.print_exc() traceback.print_exc()
with CursorFromConnectionFromPool(cursor_factory=RealDictCursor) as cursor: with CursorFromConnectionFromPool(cursor_factory=RealDictCursor) as cursor:
# TODO: log error
cursor.execute(_SQL_DELETE_ITEM, (item['id'],)) cursor.execute(_SQL_DELETE_ITEM, (item['id'],))
if success: if success:
with CursorFromConnectionFromPool(cursor_factory=RealDictCursor) as cursor: with CursorFromConnectionFromPool(cursor_factory=RealDictCursor) as cursor:
cursor.execute(_SQL_INSERT_PROCESSED_ITEM, (json.dumps(geojson_data, sort_keys=True), item['id'])) data = json.dumps([json.loads(x.model_dump_json()) for x in geofetures])
cursor.execute(_SQL_INSERT_PROCESSED_ITEM, (data, item['id']))
_logger.info(f'IMPORT: processed #{item["id"]} in {round((get_time_ms() - start) / 1000, 2)} seconds -- {worker_id}') _logger.info(f'IMPORT: processed #{item["id"]} in {round((get_time_ms() - start) / 1000, 2)} seconds -- {worker_id}')
lock_manager.unlock_row('data_importqueue', item['id']) lock_manager.unlock_row('data_importqueue', item['id'])
if not len(queue): if not len(queue):
# Only sleep if there were no items last time we checked. # Only sleep if there were no items last time we checked.
time.sleep(5) time.sleep(5)
# def _process_item_data(item)

View File

@ -0,0 +1,13 @@
from datetime import datetime
from typing import List
from geo_lib.types.item import GeoFeature
def generate_auto_tags(feature: GeoFeature) -> List[str]:
tags = []
tags.append(f'type:{feature.geometry.type.value}')
now = datetime.now()
tags.append(f'year:{now.year}')
tags.append(f'month:{now.strftime("%B")}')
return [str(x) for x in tags]

View File

@ -1,9 +1,10 @@
from typing import Optional from typing import Optional, List
from pydantic import BaseModel from pydantic import BaseModel, Field
class GeojsonRawProperty(BaseModel): class GeojsonRawProperty(BaseModel):
# Whitelist these properties. # A class to whitelist these properties.
name: str name: str
description: Optional[str] = None description: Optional[str] = None
feature_tags: List[str] = Field(default_factory=list)

View File

@ -0,0 +1,44 @@
from enum import Enum
from typing import Tuple, Optional, List
from pydantic import Field, BaseModel
class GeoFeatureType(Enum):
POINT = 'Point'
LINESTRING = 'LineString'
POLYGON = 'Polygon'
class GeoFeatureGeomoetry(BaseModel):
type: GeoFeatureType
coordinates: List[float] | List[List[float]] | List[List[List[float]]]
class GeoFeature(BaseModel):
"""
A thing that's shown on the map.
Can be a point, linestring, or polygon.
"""
name: str
id: int # From the database
type: GeoFeatureType
description: Optional[str] = None
tags: List[str] = Field(default_factory=list)
geometry: GeoFeatureGeomoetry
def geojson_to_geofeature(geojson: dict) -> List[GeoFeature]:
result = []
for item in geojson['features']:
result.append(
GeoFeature(
name=item['properties']['title'],
id=-1, # This will be updated after it's added to the main data store.
type=GeoFeatureType(item['geometry']['type']),
description=item['properties']['description'],
tags=item['properties']['feature_tags'],
geometry=GeoFeatureGeomoetry(**item['geometry'])
)
)
return result

View File

@ -1,5 +1,5 @@
from django.contrib.auth import login from django.contrib.auth import login
from django.shortcuts import render from django.shortcuts import render, redirect
from users.forms import CustomUserCreationForm from users.forms import CustomUserCreationForm
@ -15,3 +15,6 @@ def register(request):
if form.is_valid(): if form.is_valid():
user = form.save() user = form.save()
login(request, user) login(request, user)
return redirect('/account/login/')
else:
return render(request, "users/register.html", {"form": form}) # return the form with errors