continue working on importing

This commit is contained in:
Cyberes 2024-06-09 20:21:47 -06:00
parent b76f0ee951
commit 700e518e3e
32 changed files with 428 additions and 128 deletions

2
.gitignore vendored
View File

@ -1,6 +1,5 @@
.idea
# ---> Python
# Byte-compiled / optimized / DLL files
__pycache__/
@ -162,4 +161,3 @@ cython_debug/
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

14
src/geo-backend/daemon.py Normal file
View File

@ -0,0 +1,14 @@
import threading
import time
from geo_lib.daemon.database.connection import Database
from geo_lib.daemon.workers.importer import import_worker
# TODO: config
Database.initialise(minconn=1, maxconn=100, host='h.postgres.nb', database='geobackend', user='geobackend', password='juu1waigu1pookee1ohcierahMoofie3')
import_thread = threading.Thread(target=import_worker)
import_thread.start()
while True:
time.sleep(3600)

View File

View File

@ -0,0 +1,3 @@
from django.contrib import admin
# Register your models here.

View File

@ -0,0 +1,6 @@
from django.apps import AppConfig
class DatamanageConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'data'

View File

@ -0,0 +1,22 @@
from django.contrib.auth import get_user_model
from django.db import models
class ImportQueue(models.Model):
id = models.AutoField(primary_key=True)
user = models.ForeignKey(get_user_model(), on_delete=models.CASCADE)
geojson = models.JSONField(default=dict)
original_filename = models.TextField()
raw_kml = models.TextField()
raw_kml_hash = models.CharField(max_length=64, unique=True)
data = models.JSONField(default=dict)
worker_lock = models.TextField(default=None, null=True)
timestamp = models.DateTimeField(auto_now_add=True)
class GeoLogs(models.Model):
id = models.AutoField(primary_key=True)
user = models.ForeignKey(get_user_model(), on_delete=models.CASCADE)
text = models.JSONField()
source = models.TextField()
timestamp = models.DateTimeField(auto_now_add=True)

View File

@ -0,0 +1,3 @@
from django.test import TestCase
# Create your tests here.

View File

@ -0,0 +1,9 @@
from django.urls import path
from data.views.import_item import upload_item, fetch_import_queue, fetch_queued
urlpatterns = [
path('item/import/upload/', upload_item, name='upload_file'),
path('item/import/get/<int:id>', fetch_import_queue, name='fetch_import_queue'),
path('item/import/get/mine', fetch_queued, name='fetch_queued'),
]

View File

@ -0,0 +1,3 @@
from django.shortcuts import render
# Create your views here.

View File

@ -0,0 +1 @@
from .import_item import *

View File

@ -0,0 +1,93 @@
import hashlib
import json
import traceback
from django import forms
from django.core.serializers.json import DjangoJSONEncoder
from django.db import IntegrityError
from django.http import HttpResponse, JsonResponse
from data.models import ImportQueue
from geo_lib.spatial.kml import kmz_to_kml
from geo_lib.website.auth import login_required_401
class DocumentForm(forms.Form):
file = forms.FileField()
@login_required_401
def upload_item(request):
if request.method == 'POST':
form = DocumentForm(request.POST, request.FILES)
if form.is_valid():
uploaded_file = request.FILES['file']
# Check file size
# TODO: config??
if uploaded_file.size > 100 * 1024 * 1024: # file size limit 100MB
return JsonResponse({'success': False, 'msg': 'File size must be less than 100MB', 'id': None}, status=400)
file_data = uploaded_file.read()
file_name = uploaded_file.name
try:
kml_doc = kmz_to_kml(file_data)
except Exception as e:
print(traceback.format_exc()) # TODO: logging
return JsonResponse({'success': False, 'msg': 'failed to parse KML/KMZ', 'id': None}, status=400)
try:
import_queue, created = ImportQueue.objects.get_or_create(raw_kml=kml_doc, raw_kml_hash=_hash_kml(kml_doc), original_filename=file_name, user=request.user)
import_queue.save()
except IntegrityError:
created = False
import_queue = ImportQueue.objects.get(
raw_kml=kml_doc,
raw_kml_hash=_hash_kml(kml_doc),
# original_filename=file_name,
user=request.user
)
msg = 'upload successful'
if not created:
msg = 'data already exists in the import queue'
return JsonResponse({'success': True, 'msg': msg, 'id': import_queue.id}, status=200)
# TODO: put the processed data into the database and then return the ID so the frontend can go to the import page and use the ID to start the import
else:
return JsonResponse({'success': False, 'msg': 'invalid upload structure', 'id': None}, status=400)
else:
return HttpResponse(status=405)
@login_required_401
def fetch_import_queue(request, id):
if id is None:
return JsonResponse({'success': False, 'msg': 'ID not provided', 'code': 400}, status=400)
try:
queue = ImportQueue.objects.get(id=id)
if queue.user_id != request.user.id:
return JsonResponse({'success': False, 'msg': 'not authorized to view this item', 'code': 403}, status=400)
if len(queue.geojson):
return JsonResponse({'success': True, 'geojson': queue.geojson}, status=200)
return JsonResponse({'success': True, 'geojson': {}, 'msg': 'uploaded data still processing'}, status=200)
except ImportQueue.DoesNotExist:
return JsonResponse({'success': False, 'msg': 'ID does not exist', 'code': 404}, status=400)
@login_required_401
def fetch_queued(request):
user_items = ImportQueue.objects.filter(user=request.user).values('id', 'geojson', 'original_filename', 'raw_kml_hash', 'data', 'timestamp')
data = json.loads(json.dumps(list(user_items), cls=DjangoJSONEncoder))
for i, item in enumerate(data):
count = len(item['geojson']['features'])
del item['geojson']
item['feature_count'] = count
return JsonResponse({'data': data})
def _hash_kml(b: str):
if not isinstance(b, bytes):
b = b.encode()
return hashlib.sha256(b).hexdigest()

View File

@ -18,6 +18,7 @@ BASE_DIR = Path(__file__).resolve().parent.parent
# Quick-start development settings - unsuitable for production
# See https://docs.djangoproject.com/en/5.0/howto/deployment/checklist/
# TODO: user config
# SECURITY WARNING: keep the secret key used in production secret!
SECRET_KEY = 'django-insecure-f(1zo%f)wm*rl97q0^3!9exd%(s8mz92nagf4q7c2cno&bmyx='
@ -30,6 +31,7 @@ ALLOWED_HOSTS = []
INSTALLED_APPS = [
'users',
'data',
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
@ -73,6 +75,7 @@ WSGI_APPLICATION = 'geo_backend.wsgi.application'
# https://docs.djangoproject.com/en/5.0/ref/settings/#databases
# TODO: user config
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql',

View File

@ -25,4 +25,5 @@ urlpatterns = [
re_path(r"^account/", include("django.contrib.auth.urls")),
path('admin/', admin.site.urls),
path('', include("users.urls")),
path('api/data/', include("data.urls"))
]

View File

@ -0,0 +1,39 @@
from psycopg2 import pool
class Database:
__connection_pool = None
@classmethod
def initialise(cls, minconn, maxconn, **kwargs):
if cls.__connection_pool is not None:
raise Exception('Database connection pool is already initialised')
cls.__connection_pool = pool.ThreadedConnectionPool(minconn, maxconn, **kwargs)
@classmethod
def get_connection(cls):
return cls.__connection_pool.getconn()
@classmethod
def return_connection(cls, connection):
cls.__connection_pool.putconn(connection)
class CursorFromConnectionFromPool:
def __init__(self, cursor_factory=None):
self.conn = None
self.cursor = None
self.cursor_factory = cursor_factory
def __enter__(self):
self.conn = Database.get_connection()
self.cursor = self.conn.cursor(cursor_factory=self.cursor_factory)
return self.cursor
def __exit__(self, exception_type, exception_value, exception_traceback):
if exception_value is not None: # This is equivalent of saying if there is an exception
self.conn.rollback()
else:
self.cursor.close()
self.conn.commit()
Database.return_connection(self.conn)

View File

@ -0,0 +1,50 @@
import json
import time
import traceback
from uuid import uuid4
from psycopg2.extras import RealDictCursor
from geo_lib.daemon.database.connection import CursorFromConnectionFromPool
from geo_lib.logging.database import log_to_db, DatabaseLogLevel
from geo_lib.spatial.kml import kml_to_geojson
from geo_lib.time import get_time_ms
_SQL_GET_UNPROCESSED_ITEMS = "SELECT * FROM public.data_importqueue ORDER BY id ASC" # coordinates
_SQL_INSERT_PROCESSED_ITEM = "UPDATE public.data_importqueue SET geojson = %s WHERE id = %s"
# TODO: support multiple workers
def import_worker():
worker_id = str(uuid4())
while True:
with CursorFromConnectionFromPool(cursor_factory=RealDictCursor) as cursor:
cursor.execute(_SQL_GET_UNPROCESSED_ITEMS)
import_queue_items = cursor.fetchall()
if len(import_queue_items):
print(f'IMPORT: processing {len(import_queue_items)} items') # TODO: logging, also log worker ID
for item in import_queue_items:
start = get_time_ms()
try:
geojson_data, messages = kml_to_geojson(item['raw_kml'])
except Exception as e:
err_name = e.__class__.__name__
err_msg = str(e)
if hasattr(e, 'message'):
err_msg = e.message
msg = f'Failed to import item #{item["id"]}, encountered {err_name}. {err_msg}'
log_to_db(msg, level=DatabaseLogLevel.ERROR, user_id=item['user_id'])
traceback.print_exc()
continue
with CursorFromConnectionFromPool(cursor_factory=RealDictCursor) as cursor:
cursor.execute(_SQL_INSERT_PROCESSED_ITEM, (json.dumps(geojson_data, sort_keys=True), item['id']))
print(f'IMPORT: processed #{item["id"]} in {round((get_time_ms() - start) / 1000, 2)} seconds') # TODO: logging, also log worker ID
if not len(import_queue_items):
# Only sleep if there were no items last time we checked.
time.sleep(5)
# def _process_item_data(item)

View File

@ -0,0 +1,32 @@
import logging
from enum import Enum
from psycopg2.extras import RealDictCursor
from pydantic import BaseModel
from geo_lib.daemon.database.connection import CursorFromConnectionFromPool
class DatabaseLogLevel(Enum):
DEBUG = logging.DEBUG
INFO = logging.INFO
WARNING = logging.WARNING
ERROR = logging.ERROR
CRITICAL = logging.CRITICAL
class DatabaseLogItem(BaseModel):
# Using an object so we can add arbitrary data if nessesary.
msg: str
level: DatabaseLogLevel
_SQL_INSERT_ITEM = 'INSERT INTO public.data_geologs (user_id, text, source) VALUES (%s, %s, %s)'
def log_to_db(msg: str, level: DatabaseLogLevel, user_id: int, source: str):
print(msg)
return
# TODO:
with CursorFromConnectionFromPool(cursor_factory=RealDictCursor) as cursor:
cursor.execute(_SQL_INSERT_ITEM, (user_id, DatabaseLogItem(msg=msg, level=level).json()))

View File

@ -1,58 +1,83 @@
import io
import re
import json
import zipfile
from typing import Union, Tuple
from fastkml import kml
from shapely.geometry import mapping, shape
import geojson
import kml2geojson
from dateparser import parse
from geojson import FeatureCollection, Point, LineString, Polygon
from geo_lib.types.geojson import GeojsonRawProperty
# TODO: preserve KML object styling, such as color and opacity
def kml_to_geojson(kml_bytes):
def kmz_to_kml(kml_bytes: Union[str, bytes]) -> str:
if isinstance(kml_bytes, str):
kml_bytes = kml_bytes.encode('utf-8')
try:
# Try to open as a zipfile (KMZ)
with zipfile.ZipFile(io.BytesIO(kml_bytes), 'r') as kmz:
# Find the first .kml file in the zipfile
kml_file = [name for name in kmz.namelist() if name.endswith('.kml')][0]
doc = kmz.read(kml_file).decode('utf-8')
return kmz.read(kml_file).decode('utf-8')
except zipfile.BadZipFile:
# If not a zipfile, assume it's a KML file
doc = kml_bytes.decode('utf-8')
return kml_bytes.decode('utf-8')
# Remove XML declaration
doc = re.sub(r'<\?xml.*\?>', '', doc)
k = kml.KML()
k.from_string(doc)
def kml_to_geojson(kml_bytes) -> Tuple[dict, list]:
# TODO: preserve KML object styling, such as color and opacity
doc = kmz_to_kml(kml_bytes)
features = []
process_feature(features, k)
converted_kml = kml2geojson.main.convert(io.BytesIO(doc.encode('utf-8')))
return {
features, messages = process_feature(converted_kml)
data = {
'type': 'FeatureCollection',
'features': features
}
return load_geojson_type(data), messages
def process_feature(features, feature):
# Recursive function to handle folders within folders
if isinstance(feature, (kml.Document, kml.Folder, kml.KML)):
for child in feature.features():
process_feature(features, child)
elif isinstance(feature, kml.Placemark):
geom = shape(feature.geometry)
# Only keep points, lines and polygons
if geom.geom_type in ['Point', 'LineString', 'Polygon']:
features.append({
'type': 'Feature',
'properties': {
'name': feature.name,
'description': feature.description,
},
'geometry': mapping(geom),
})
if feature.extended_data is not None:
features['properties'].update(feature.extended_data)
else:
# raise ValueError(f'Unknown feature: {type(feature)}')
pass
def process_feature(converted_kml):
features = []
messages = []
for feature in converted_kml[0]['features']:
if feature['geometry']['type'] in ['Point', 'LineString', 'Polygon']:
if feature['properties'].get('times'):
for i, timestamp_str in enumerate(feature['properties']['times']):
timestamp = int(parse(timestamp_str).timestamp() * 1000)
feature['geometry']['coordinates'][i].append(timestamp)
feature['properties'] = GeojsonRawProperty(**feature['properties']).dict()
features.append(feature)
else:
# Log the error
messages.append(f'Feature type {feature["properties"]["type"]} not supported')
return features, messages
def load_geojson_type(data: dict) -> dict:
features = []
for feature in data['features']:
if feature['geometry']['type'] == 'Point':
features.append(
Point(coordinates=feature['geometry']['coordinates'], properties=feature['properties'])
)
elif feature['geometry']['type'] == 'LineString':
features.append(
LineString(coordinates=feature['geometry']['coordinates'], properties=feature['properties'])
)
elif feature['geometry']['type'] == 'Polygon':
features.append(
Polygon(coordinates=feature['geometry']['coordinates'], properties=feature['properties'])
)
collection = FeatureCollection(features)
geojson_dict = json.loads(geojson.dumps(collection, sort_keys=True))
for item in geojson_dict['features']:
item['geometry'] = {
'type': item.pop('type'),
'coordinates': item.pop('coordinates'),
}
item['type'] = 'Feature'
item['properties']['title'] = item['properties'].pop('name')
return geojson_dict

View File

@ -0,0 +1,2 @@
# def generate_tags(geojson: dict):

View File

@ -0,0 +1,5 @@
import time
def get_time_ms():
return time.time_ns() // 1_000_000

View File

@ -0,0 +1,9 @@
from typing import Optional
from pydantic import BaseModel
class GeojsonRawProperty(BaseModel):
# Whitelist these properties.
name: str
description: Optional[str] = None

View File

@ -1,6 +1,6 @@
from functools import wraps
from django.http import HttpResponse
from django.http import JsonResponse
def login_required_401(view_func):
@ -9,6 +9,6 @@ def login_required_401(view_func):
if request.user.is_authenticated:
return view_func(request, *args, **kwargs)
else:
return HttpResponse('Unauthorized', status=401)
return JsonResponse({'error': 'Unauthorized'}, status=401)
return _wrapped_view

View File

@ -1,5 +1,8 @@
django==5.0.6
psycopg2-binary==2.9.9
shapely==2.0.4
fastkml==0.12
lxml==5.2.2
kml2geojson==5.1.0
dateparser==1.2.0
geojson==3.1.0
pydantic==2.7.3

View File

@ -1,12 +1,9 @@
from django.urls import re_path, path
from django.urls import re_path
from users.views import register, dashboard, check_auth
from users.views.upload_item import upload_item, fetch_import_queue
urlpatterns = [
re_path(r"^account/register/", register, name="register"),
re_path(r"^user/dashboard/", dashboard, name="dashboard"),
re_path(r"^api/user/status/", check_auth),
re_path(r'^api/item/import/upload/', upload_item, name='upload_file'),
path('api/item/import/get/<int:id>', fetch_import_queue, name='fetch_import_queue'),
]

View File

@ -1,62 +0,0 @@
from django import forms
from django.contrib.auth.models import User
from django.db import models
from django.http import HttpResponse, JsonResponse
from geo_lib.spatial.kml import kml_to_geojson
class Document(models.Model):
uploaded_at = models.DateTimeField(auto_now_add=True)
upload = models.FileField()
class DocumentForm(forms.Form):
file = forms.FileField()
class ImportQueue(models.Model):
data = models.JSONField()
user = models.ForeignKey(User, on_delete=models.CASCADE)
class Meta:
db_table = 'import_queue'
def upload_item(request):
if request.method == 'POST':
form = DocumentForm(request.POST, request.FILES)
if form.is_valid():
uploaded_file = request.FILES['file']
file_data = uploaded_file.read()
try:
geojson = kml_to_geojson(file_data)
import_queue, created = ImportQueue.objects.get_or_create(data=geojson, user=request.user)
import_queue.save()
msg = 'upload successful'
if not created:
msg = 'data already exists in the import queue'
return JsonResponse({'success': True, 'msg': msg, 'id': import_queue.id}, status=200)
except Exception as e:
print(e) # TODO: logging
return JsonResponse({'success': False, 'msg': 'failed to parse KML/KMZ', 'id': None}, status=400)
# TODO: put the processed data into the database and then return the ID so the frontend can go to the import page and use the ID to start the import
else:
return JsonResponse({'success': False, 'msg': 'invalid upload structure', 'id': None}, status=400)
else:
return HttpResponse(status=405)
def fetch_import_queue(request, id):
if id is None:
return JsonResponse({'success': False, 'msg': 'ID not provided', 'code': 400}, status=400)
try:
queue = ImportQueue.objects.get(id=id)
if queue.user_id != request.user.id:
return JsonResponse({'success': False, 'msg': 'not authorized to view this item', 'code': 403}, status=400)
return JsonResponse({'success': True, 'data': queue.data}, status=200)
except ImportQueue.DoesNotExist:
return JsonResponse({'success': False, 'msg': 'ID does not exist', 'code': 404}, status=400)

View File

@ -1,8 +1,10 @@
<template>
<div v-if="msg !== ''">
<p>{{ msg }}</p>
<p class="font-bold">{{ msg }}</p>
</div>
<!-- TODO: loading indicator -->
<div>
<li v-for="(item, index) in geoJsonData" :key="`item-${index}`">
<pre>
@ -18,6 +20,9 @@ import {authMixin} from "@/assets/js/authMixin.js";
import axios from "axios";
import {capitalizeFirstLetter} from "@/assets/js/string.js";
// TODO: for each feature, query the DB and check if there is a duplicate. For points that's duplicate coords, for linestrings and polygons that's duplicate points
// TODO: auto-refresh if still processing
export default {
computed: {
...mapState(["userInfo"]),
@ -40,16 +45,21 @@ export default {
return item
}
},
created() {
axios.get('/api/item/import/get/' + this.id).then(response => {
if (!response.data.success) {
this.handleError(response.data.msg)
} else {
this.geoJsonData = response.data.data
}
}).catch(error => {
this.handleError(error.message)
});
beforeRouteEnter(to, from, next) {
next(async vm => {
axios.get('/api/data/item/import/get/' + vm.id).then(response => {
if (!response.data.success) {
vm.handleError(response.data.msg)
} else {
if (Object.keys(response.data.geojson).length > 0) {
vm.geoJsonData = response.data.geojson
}
vm.msg = response.data.msg
}
}).catch(error => {
vm.handleError(error.message)
});
})
},
};
</script>

View File

@ -2,6 +2,9 @@
<div class="mb-10">
<p>import data</p>
<p>Only KML/KMZ files supported.</p>
<p>Be careful not to upload duplicate files of the opposite type. For example, do not upload both
<kbd>example.kml</kbd>
and <kbd>example.kmz</kbd>. Currently, the system can't detect duplicate cross-file types.</p>
</div>
<div class="relative w-[90%] m-auto">
@ -11,8 +14,24 @@
</div>
</div>
<div v-if="uploadMsg !== ''" class="w-[90%] m-auto mt-10" v-html="uploadMsg">
</div>
<div v-if="uploadMsg !== ''" class="w-[90%] m-auto mt-10" v-html="uploadMsg"></div>
<table>
<thead>
<tr>
<th>File Name</th>
<th>Features</th>
<th></th>
</tr>
</thead>
<tbody>
<tr v-for="(item, index) in processQueue" :key="`item-${index}`">
<td><a :href="`/#/import/process/${item.id}`">{{ item.original_filename }}</a></td>
<td>{{ item.feature_count }}</td>
<td>button to delete from queue</td>
</tr>
</tbody>
</table>
</template>
<script>
@ -21,6 +40,8 @@ import {authMixin} from "@/assets/js/authMixin.js";
import axios from "axios";
import {capitalizeFirstLetter} from "@/assets/js/string.js";
// TODO: after import, don't disable the upload, instead add the new item to a table at the button and then prompt the user to continue
export default {
computed: {
...mapState(["userInfo"]),
@ -31,7 +52,8 @@ export default {
return {
file: null,
disableUpload: false,
uploadMsg: ""
uploadMsg: "",
processQueue: []
}
},
methods: {
@ -44,9 +66,13 @@ export default {
}
},
upload() {
let formData = new FormData();
formData.append('file', this.file);
axios.post('/api/item/import/upload/', formData, {
this.uploadMsg = ""
if (this.file == null) {
return
}
let formData = new FormData()
formData.append('file', this.file)
axios.post('/api/data/item/import/upload/', formData, {
headers: {
'Content-Type': 'multipart/form-data',
'X-CSRFToken': this.userInfo.csrftoken
@ -56,10 +82,17 @@ export default {
this.disableUpload = true
}).catch(error => {
this.handleError(error)
});
})
},
handleError(error) {
console.log(error);
console.error("Upload failed:", error)
if (error.response.data.msg != null) {
this.uploadMsg = error.response.data.msg
}
},
async fetchQueueList() {
const response = await axios.get('/api/data/item/import/get/mine')
this.processQueue = response.data.data
}
},
async created() {
@ -71,6 +104,7 @@ export default {
vm.file = null
vm.disableUpload = false
vm.uploadMsg = ""
await vm.fetchQueueList()
})
},
watch: {},