Compare commits

...

2 Commits

10 changed files with 362 additions and 498 deletions

125
export.py
View File

@ -1,26 +1,27 @@
import argparse
import json
import os
from http.cookiejar import MozillaCookieJar
from pathlib import Path
import canvasapi
import jsonpickle
import requests
import yaml
from canvasapi import Canvas
from module.const import COURSES_TO_SKIP, DL_LOCATION
from module.download_canvas import download_assignment_pages, download_course_announcement_pages, download_course_discussion_pages, download_course_files, download_course_html, download_course_module_pages, download_submission_attachments, download_course_grades_page, download_course_home_page_html
from module.get_canvas import find_course_announcements, find_course_assignments, find_course_discussions, find_course_modules, find_course_pages
from module.items import CourseView
from module.const import global_consts
from module.download_canvas import download_assignments, download_course_modules, download_course_grades_page, download_course_announcement_pages, download_course_home_page_html, download_course_discussion_pages
from module.get_canvas import find_course_pages, find_course_modules, find_course_assignments, find_course_announcements, find_course_discussions
from module.items import CanvasCourse, jsonify_anything
from module.singlefile import download_page
from module.user_files import download_user_files
SCRIPT_PATH = os.path.abspath(os.path.dirname(__file__))
def export_all_course_data(c):
json_data = json.dumps(json.loads(jsonpickle.encode(c, unpicklable=False)), indent=4)
course_output_dir = os.path.join(DL_LOCATION, c.term, c.name)
json_data = jsonify_anything(c)
course_output_dir = os.path.join(OUTPUT_LOCATION, c.term, c.name)
if not os.path.exists(course_output_dir):
os.makedirs(course_output_dir)
course_output_path = os.path.join(course_output_dir, c.name + ".json")
@ -29,6 +30,15 @@ def export_all_course_data(c):
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='')
parser.add_argument('--output', default='./output', help='Output location. If it does not exist, it will be created.')
parser.add_argument('--term', default=None, help='Only download this term.')
parser.add_argument('--user-files', action='store_true', help="Download the user files.")
args = parser.parse_args()
OUTPUT_LOCATION = Path(args.output).resolve().expanduser().absolute()
OUTPUT_LOCATION.mkdir(parents=True, exist_ok=True)
# Startup checks.
creds_file = Path(SCRIPT_PATH, 'credentials.yaml')
if not creds_file.is_file():
@ -38,47 +48,43 @@ if __name__ == "__main__":
with open("credentials.yaml", 'r') as f:
credentials = yaml.full_load(f)
API_URL = credentials["API_URL"]
API_KEY = credentials["API_KEY"]
USER_ID = credentials["USER_ID"]
COOKIES_PATH = str(Path(credentials["COOKIES_PATH"]).resolve().expanduser().absolute())
global_consts.API_URL = credentials["API_URL"]
global_consts.API_KEY = credentials["API_KEY"]
global_consts.USER_ID = credentials["USER_ID"]
global_consts.COOKIES_PATH = str(Path(credentials["COOKIES_PATH"]).resolve().expanduser().absolute())
if not Path(COOKIES_PATH).is_file():
print('The cookies file does not exist:', COOKIES_PATH)
if not Path(global_consts.COOKIES_PATH).is_file():
print('The cookies file does not exist:', global_consts.COOKIES_PATH)
quit(1)
COOKIE_JAR = MozillaCookieJar(COOKIES_PATH)
COOKIE_JAR.load(ignore_discard=True, ignore_expires=True)
global_consts.COOKIE_JAR = MozillaCookieJar(global_consts.COOKIES_PATH)
global_consts.COOKIE_JAR.load(ignore_discard=True, ignore_expires=True)
# ==================================================================================================================
# Initialization
print("Welcome to the Canvas Student Data Export Tool")
if not os.path.exists(DL_LOCATION):
print("Creating output directory:", DL_LOCATION)
os.makedirs(DL_LOCATION)
if not os.path.exists(OUTPUT_LOCATION):
print("Creating output directory:", OUTPUT_LOCATION)
os.makedirs(OUTPUT_LOCATION)
if COOKIES_PATH:
if global_consts.COOKIES_PATH:
# Test the cookies.
print("Authenticating with Canvas frontend...")
# Test the cookies.
cookies = MozillaCookieJar(COOKIES_PATH)
cookies.load(ignore_discard=True, ignore_expires=True)
# Requests takes a dict, not the MozillaCookieJar object.
request_cookies = {}
for cookie in cookies:
request_cookies[cookie.name] = cookie.value
request_cookies = {c.name: c.value for c in global_consts.COOKIE_JAR}
r = requests.get(f'{API_URL}/profile', headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'}, cookies=request_cookies)
r = requests.get(f'{global_consts.API_URL}/profile', headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'}, cookies=request_cookies)
if r.status_code != 200:
print('Failed to fetch Canvas profile: got status code', r.status_code)
quit(1)
if not r.url.startswith(API_URL):
if not r.url.startswith(global_consts.API_URL):
print('Failed to fetch Canvas profile: client was redirected away from Canvas:')
print(r.url)
quit(1)
if 'profileContent__Block' not in r.text:
# TODO: add an arg to skip this check.
print('Failed to test Canvas profile: could not find an element with the class "profileContent__Block". This could mean that your authentication is incorrect.')
quit(1)
@ -87,7 +93,7 @@ if __name__ == "__main__":
print('No cookies file specified! No HTML pages will be saved.')
print("Authenticating with Canvas API...")
canvas = Canvas(API_URL, API_KEY)
canvas = Canvas(global_consts.API_URL, global_consts.API_KEY)
courses = canvas.get_courses(include="term")
try:
course_count = len(list(courses))
@ -102,16 +108,20 @@ if __name__ == "__main__":
print('')
skip = set(COURSES_TO_SKIP)
skip = set(global_consts.COURSES_TO_SKIP)
# ==================================================================================================================
# Exporting
print("Downloading courses page...")
download_course_html(API_URL, COOKIES_PATH)
courses_dict = {v['id']: v for v in json.loads(jsonify_anything(courses))['_elements']}
(global_consts.OUTPUT_LOCATION / 'courses.json').write_text(json.dumps(courses_dict))
download_page(global_consts.API_URL + "/courses/", global_consts.OUTPUT_LOCATION, "courses.html")
if args.user_files:
print('Downloading user files...')
download_user_files(canvas, OUTPUT_LOCATION / 'User Files')
print('Downloading user files...')
download_user_files(canvas, DL_LOCATION / 'User Files')
print('')
all_courses_views = []
@ -120,52 +130,57 @@ if __name__ == "__main__":
if course.id in skip or not hasattr(course, "name") or not hasattr(course, "term"):
continue
course_view = CourseView(course)
print(f"=== {course_view.term}: {course_view.name} ===")
resolved_canvas_course = CanvasCourse(course)
valid, r = course_view.test_course(API_URL, COOKIE_JAR)
if args.term and args.term != resolved_canvas_course.term:
print('Skipping term:', resolved_canvas_course.term, '\n')
continue
print(f"=== {resolved_canvas_course.term}: {resolved_canvas_course.name} ===")
valid, r = resolved_canvas_course.test_course(global_consts.API_URL, global_consts.COOKIE_JAR)
if not valid:
print(f'Invalid course: {course_view.course_id} - {r} - {r.text}')
print(f'Invalid course: {resolved_canvas_course.course_id} - {r} - {r.text}')
if r.status_code == 401:
# We can't recover from this error.
quit(1)
continue
course_view.assignments = find_course_assignments(course, USER_ID)
course_view.announcements = find_course_announcements(course)
course_view.discussions = find_course_discussions(course)
course_view.pages = find_course_pages(course)
course_view.modules = find_course_modules(course, course_view)
all_courses_views.append(course_view)
resolved_canvas_course.modules = find_course_modules(course)
resolved_canvas_course.assignments = find_course_assignments(course)
resolved_canvas_course.announcements = find_course_announcements(course)
resolved_canvas_course.discussions = find_course_discussions(course)
resolved_canvas_course.pages = find_course_pages(course)
all_courses_views.append(resolved_canvas_course)
print('Downloading course home page...')
download_course_home_page_html(API_URL, course_view, COOKIES_PATH)
download_course_home_page_html(resolved_canvas_course)
print('Downloading grades...')
download_course_grades_page(API_URL, course_view, COOKIES_PATH)
download_course_grades_page(resolved_canvas_course)
download_assignment_pages(API_URL, course_view, COOKIES_PATH, COOKIE_JAR)
download_assignments(resolved_canvas_course)
download_course_module_pages(API_URL, course_view, COOKIES_PATH)
download_course_modules(resolved_canvas_course)
download_course_announcement_pages(API_URL, course_view, COOKIES_PATH)
download_course_announcement_pages(resolved_canvas_course)
download_course_discussion_pages(API_URL, course_view, COOKIES_PATH)
download_course_discussion_pages(resolved_canvas_course)
download_course_files(course, course_view)
download_submission_attachments(course, course_view)
# TODO: nothing to test this on
# download_course_files(course)
print("Exporting course metadata...")
export_all_course_data(course_view)
export_all_course_data(resolved_canvas_course)
if course_count > 1:
print('')
# Remove elements from the course objects that can't be JSON serialized, then format it.
json_str = json.dumps(json.loads(jsonpickle.encode(all_courses_views, unpicklable=False)), indent=4)
json_str = jsonify_anything(all_courses_views)
all_output_path = os.path.join(DL_LOCATION, "all_output.json")
all_output_path = os.path.join(OUTPUT_LOCATION, "all_output.json")
with open(all_output_path, "w") as out_file:
out_file.write(json_str)

0
module/api/__init__.py Normal file
View File

21
module/api/file.py Normal file
View File

@ -0,0 +1,21 @@
import re
import canvasapi
from canvasapi.course import Course
HTML_ITEM_ATTACHED_FILE_RE = re.compile(r'<a .*? data-api-endpoint=\"(.*?)\" .*?>')
CANVAS_API_FILE_ID_RE = re.compile(r'.*?/api/v1/courses/.*?/files/(.*?)$')
def get_embedded_files(course: Course, html: str):
attached_files = set()
file_matches = re.findall(HTML_ITEM_ATTACHED_FILE_RE, html)
for match in file_matches:
file_id = re.match(CANVAS_API_FILE_ID_RE, match)
if file_id:
try:
canvas_file = course.get_file(file_id.group(1))
attached_files.add(canvas_file)
except canvasapi.exceptions.ResourceDoesNotExist:
continue
return attached_files

View File

@ -1,14 +1,28 @@
from http.cookiejar import MozillaCookieJar
from pathlib import Path
# Directory in which to download course information to (will be created if not present)
DL_LOCATION = Path("./output").resolve().expanduser().absolute()
# List of Course IDs that should be skipped (need to be integers)
COURSES_TO_SKIP = [288290, 512033]
class GlobalConsts:
# Directory in which to download course information to (will be created if not present)
OUTPUT_LOCATION = Path("./output").resolve().expanduser().absolute()
DATE_TEMPLATE = "%B %d, %Y %I:%M %p"
# List of Course IDs that should be skipped (need to be integers)
COURSES_TO_SKIP = []
# Max PATH length is 260 characters on Windows. 70 is just an estimate for a reasonable max folder name to prevent the chance of reaching the limit
# Applies to modules, assignments, announcements, and discussions
# If a folder exceeds this limit, a "-" will be added to the end to indicate it was shortened ("..." not valid)
MAX_FOLDER_NAME_SIZE = 70
DATE_TEMPLATE = "%B %d, %Y %I:%M %p"
# Max PATH length is 260 characters on Windows. 70 is just an estimate for a reasonable max folder name to prevent the chance of reaching the limit
# Applies to modules, assignments, announcements, and discussions
# If a folder exceeds this limit, a "-" will be added to the end to indicate it was shortened ("..." not valid)
MAX_FOLDER_NAME_SIZE = 70
COOKIES_PATH = ""
COOKIE_JAR = MozillaCookieJar()
API_URL = ""
API_KEY = ""
USER_ID = ""
global_consts = GlobalConsts()

View File

@ -1,26 +1,20 @@
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from functools import partial
from http.cookiejar import MozillaCookieJar
from pathlib import Path
import canvasapi
import requests
from tqdm import tqdm
from module.const import DL_LOCATION, MAX_FOLDER_NAME_SIZE
from module.api.file import get_embedded_files
from module.const import global_consts
from module.helpers import make_valid_filename, make_valid_folder_path, shorten_file_name
from module.items import CanvasCourse, jsonify_anything
from module.singlefile import download_page
from module.threading import download_assignment, download_module_item
def download_course_files(course, course_view):
# file full_name starts with "course files"
dl_dir = os.path.join(DL_LOCATION, course_view.term, course_view.name)
# Create directory if not present
if not os.path.exists(dl_dir):
os.makedirs(dl_dir)
dl_dir = global_consts.OUTPUT_LOCATION / course_view.term / course_view.name
dl_dir.mkdir(parents=True, exist_ok=True)
try:
files = list(course.get_files())
@ -31,206 +25,109 @@ def download_course_files(course, course_view):
for file in tqdm(files, desc='Downloading Files'):
try:
file_folder = course.get_folder(file.folder_id)
folder_dl_dir = os.path.join(dl_dir, make_valid_folder_path(file_folder.full_name))
if not os.path.exists(folder_dl_dir):
os.makedirs(folder_dl_dir)
dl_path = os.path.join(folder_dl_dir, make_valid_filename(str(file.display_name)))
# Download file if it doesn't already exist
if not os.path.exists(dl_path):
# print('Downloading: {}'.format(dl_path))
file.download(dl_path)
folder_dl_dir = dl_dir / make_valid_folder_path(file_folder.full_name)
folder_dl_dir.mkdir(parents=True, exist_ok=True)
dl_path = folder_dl_dir / make_valid_filename(str(file.display_name))
file.download(dl_path)
except Exception as e:
tqdm.write(f"Skipping {file.display_name} - {e}")
def download_course_discussion_pages(api_url, course_view, cookies_path):
if cookies_path == "" or len(course_view.discussions) == 0:
def download_course_discussion_pages(resolved_course: CanvasCourse):
if not len(resolved_course.discussions):
return
base_discussion_dir = os.path.join(DL_LOCATION, course_view.term, course_view.name, "discussions")
if not os.path.exists(base_discussion_dir):
os.makedirs(base_discussion_dir)
base_discussion_dir = global_consts.OUTPUT_LOCATION / resolved_course.term / resolved_course.name / 'discussions'
base_discussion_dir.mkdir(parents=True, exist_ok=True)
discussion_list_dir = os.path.join(base_discussion_dir, "discussion_list.html")
# (base_discussion_dir / 'discussions.json').write_text(jsonify_anything(resolved_course.discussions))
download_page(global_consts.API_URL + "/courses/" + str(resolved_course.course_id) + "/discussion_topics/", base_discussion_dir, "discussions.html")
# Download assignment list (theres a chance this might be the course homepage if the course has the assignments page disabled)
if not os.path.exists(discussion_list_dir):
download_page(api_url + "/courses/" + str(course_view.course_id) + "/discussion_topics/", cookies_path, base_discussion_dir, "discussion_list.html")
for discussion in tqdm(list(course_view.discussions), desc='Downloading Discussions'):
for discussion in tqdm(list(resolved_course.discussions), desc='Downloading Discussions'):
discussion_title = make_valid_filename(str(discussion.title))
discussion_title = shorten_file_name(discussion_title, len(discussion_title) - MAX_FOLDER_NAME_SIZE)
discussion_dir = os.path.join(base_discussion_dir, discussion_title)
discussion_title = shorten_file_name(discussion_title, len(discussion_title) - global_consts.MAX_FOLDER_NAME_SIZE)
discussion_dir = base_discussion_dir / discussion_title
if discussion.url == "":
if not discussion.url:
continue
if not os.path.exists(discussion_dir):
os.makedirs(discussion_dir)
discussion_dir.mkdir(parents=True, exist_ok=True)
for file in get_embedded_files(resolved_course.course, discussion.body):
file.download(discussion_dir / file.display_name)
# Downloads each page that a discussion takes.
for i in range(discussion.amount_pages):
filename = "discussion_" + str(i + 1) + ".html"
discussion_page_dir = os.path.join(discussion_dir, filename)
# Download assignment page, this usually has instructions and etc.
if not os.path.exists(discussion_page_dir):
download_page(discussion.url + "/page-" + str(i + 1), cookies_path, discussion_dir, filename)
download_page(discussion.url + "/page-" + str(i + 1), discussion_dir, filename)
def download_assignment_pages(api_url, course_view, cookies_path, cookie_jar: MozillaCookieJar):
if cookies_path == "" or len(course_view.assignments) == 0:
def download_assignments(course_view: CanvasCourse):
if not len(course_view.assignments):
return
base_assign_dir = os.path.join(DL_LOCATION, course_view.term, course_view.name, "assignments")
if not os.path.exists(base_assign_dir):
os.makedirs(base_assign_dir)
base_assign_dir = global_consts.OUTPUT_LOCATION / course_view.term / course_view.name / 'assignments'
base_assign_dir.mkdir(parents=True, exist_ok=True)
assignment_list_path = os.path.join(base_assign_dir, "assignment_list.html")
# Download assignment list (theres a chance this might be the course homepage if the course has the assignments page disabled)
if not os.path.exists(assignment_list_path):
download_page(api_url + "/courses/" + str(course_view.course_id) + "/assignments/", cookies_path, base_assign_dir, "assignment_list.html")
# (base_assign_dir / 'assignments.json').write_text(jsonify_anything(course_view.assignments))
download_page(global_consts.API_URL + "/courses/" + str(course_view.course_id) + "/assignments/", base_assign_dir, "assignments.html")
with ThreadPoolExecutor(max_workers=3) as executor:
download_func = partial(download_assignment, cookies_path, cookie_jar, base_assign_dir)
download_func = partial(download_assignment, base_assign_dir, course_view.course)
list(tqdm(executor.map(download_func, course_view.assignments), total=len(course_view.assignments), desc='Downloading Assignments'))
def download_course_announcement_pages(api_url, course_view, cookies_path):
"""
Download assignment list.
There's a chance this might be the course homepage if the course has the assignments page disabled.
:param api_url:
:param course_view:
:param cookies_path:
:return:
"""
if cookies_path == "" or len(course_view.announcements) == 0:
def download_course_announcement_pages(resolved_course: CanvasCourse):
if not len(resolved_course.announcements):
return
base_announce_dir = os.path.join(DL_LOCATION, course_view.term, course_view.name, "announcements")
if not os.path.exists(base_announce_dir):
os.makedirs(base_announce_dir)
announcement_list_dir = os.path.join(base_announce_dir, "announcement_list.html")
if not os.path.exists(announcement_list_dir):
download_page(api_url + "/courses/" + str(course_view.course_id) + "/announcements/", cookies_path, base_announce_dir, "announcement_list.html")
base_announce_dir = global_consts.OUTPUT_LOCATION / resolved_course.term / resolved_course.name / 'announcements'
base_announce_dir.mkdir(parents=True, exist_ok=True)
for announcements in tqdm(list(course_view.announcements), desc='Downloading Announcements'):
announcements_title = make_valid_filename(str(announcements.title))
announcements_title = shorten_file_name(announcements_title, len(announcements_title) - MAX_FOLDER_NAME_SIZE)
announce_dir = os.path.join(base_announce_dir, announcements_title)
# (base_announce_dir / 'announcements.json').write_text(jsonify_anything(resolved_course.announcements))
download_page(global_consts.API_URL + "/courses/" + str(resolved_course.course_id) + "/announcements/", base_announce_dir, "announcements.html")
if announcements.url == "":
for announcement in tqdm(list(resolved_course.announcements), desc='Downloading Announcements'):
announcements_title = make_valid_filename(str(announcement.title))
announcements_title = shorten_file_name(announcements_title, len(announcements_title) - global_consts.MAX_FOLDER_NAME_SIZE)
announce_dir = base_announce_dir / announcements_title
if not announcement.url:
continue
if not os.path.exists(announce_dir):
os.makedirs(announce_dir)
announce_dir.mkdir(parents=True, exist_ok=True)
# Downloads each page that a discussion takes.
for i in range(announcements.amount_pages):
for file in get_embedded_files(resolved_course.course, announcement.body):
file.download(announce_dir / file.display_name)
for i in range(announcement.amount_pages):
filename = "announcement_" + str(i + 1) + ".html"
announcement_page_dir = os.path.join(announce_dir, filename)
# Download assignment page, this usually has instructions and etc.
if not os.path.exists(announcement_page_dir):
download_page(announcements.url + "/page-" + str(i + 1), cookies_path, announce_dir, filename)
download_page(announcement.url + "/page-" + str(i + 1), announce_dir, filename)
def download_submission_attachments(course, course_view):
course_dir = os.path.join(DL_LOCATION, course_view.term, course_view.name)
# Create directory if not present
if not os.path.exists(course_dir):
os.makedirs(course_dir)
for assignment in tqdm(list(course_view.assignments), desc='Downloading Submissions'):
for submission in assignment.submissions:
assignment_title = make_valid_filename(str(assignment.title))
assignment_title = shorten_file_name(assignment_title, len(assignment_title) - MAX_FOLDER_NAME_SIZE)
attachment_dir = os.path.join(course_dir, "assignments", assignment_title)
if len(assignment.submissions) != 1:
attachment_dir = os.path.join(attachment_dir, str(submission.user_id))
if not os.path.exists(attachment_dir) and submission.attachments:
os.makedirs(attachment_dir)
for attachment in submission.attachments:
filepath = os.path.join(attachment_dir, make_valid_filename(str(attachment.id) + "_" + attachment.filename))
if not os.path.exists(filepath):
# print('Downloading attachment: {}'.format(filepath))
r = requests.get(attachment.url, allow_redirects=True)
with open(filepath, 'wb') as f:
f.write(r.content)
# else:
# print('File already exists: {}'.format(filepath))
def download_course_home_page_html(course_view):
dl_dir = global_consts.OUTPUT_LOCATION / course_view.term / course_view.name
dl_dir.mkdir(parents=True, exist_ok=True)
download_page(global_consts.API_URL + "/courses/" + str(course_view.course_id), dl_dir, "homepage.html")
def download_course_html(api_url, cookies_path):
if cookies_path == "":
return
def download_course_modules(course_view: CanvasCourse):
modules_dir = global_consts.OUTPUT_LOCATION / course_view.term / course_view.name / 'modules'
modules_dir.mkdir(parents=True, exist_ok=True)
course_dir = DL_LOCATION
if not os.path.exists(course_dir):
os.makedirs(course_dir)
course_list_path = os.path.join(course_dir, "course_list.html")
# Downloads the course list.
if not os.path.exists(course_list_path):
download_page(api_url + "/courses/", cookies_path, course_dir, "course_list.html")
def download_course_home_page_html(api_url, course_view, cookies_path):
if cookies_path == "":
return
dl_dir = os.path.join(DL_LOCATION, course_view.term, course_view.name)
if not os.path.exists(dl_dir):
os.makedirs(dl_dir)
homepage_path = os.path.join(dl_dir, "homepage.html")
# Downloads the course home page.
if not os.path.exists(homepage_path):
download_page(api_url + "/courses/" + str(course_view.course_id), cookies_path, dl_dir, "homepage.html")
def download_course_module_pages(api_url, course_view, cookies_path):
if cookies_path == "" or len(course_view.modules) == 0:
return
modules_dir = os.path.join(DL_LOCATION, course_view.term, course_view.name, "modules")
if not os.path.exists(modules_dir):
os.makedirs(modules_dir)
module_list_dir = os.path.join(modules_dir, "modules_list.html")
# Downloads the modules page (possible this is disabled by the teacher)
if not os.path.exists(module_list_dir):
download_page(api_url + "/courses/" + str(course_view.course_id) + "/modules/", cookies_path, modules_dir, "modules_list.html")
# (modules_dir / 'modules.json').write_text(jsonify_anything(course_view.modules))
download_page(global_consts.API_URL + "/courses/" + str(course_view.course_id) + "/modules/", modules_dir, "modules.html")
with ThreadPoolExecutor(max_workers=3) as executor:
for module in tqdm(list(course_view.modules), desc='Downloading Module Pages'):
bar = tqdm(list(module.items), leave=False, desc=module.name)
futures = [executor.submit(download_module_item, module, item, modules_dir, cookies_path) for item in module.items]
for module in tqdm(list(course_view.modules), desc='Downloading Modules'):
bar = tqdm(list(module.items), leave=False, desc=module.module.name)
futures = [executor.submit(download_module_item, course_view.course, module, item, modules_dir) for item in module.items]
for _ in as_completed(futures):
bar.update()
bar.close()
def download_course_grades_page(api_url, course_view, cookies_path):
if cookies_path == "":
return
dl_dir = Path(DL_LOCATION, course_view.term, course_view.name)
def download_course_grades_page(course_view: CanvasCourse):
dl_dir = global_consts.OUTPUT_LOCATION / course_view.term / course_view.name
dl_dir.mkdir(parents=True, exist_ok=True)
# TODO: command line arg to prohibit overwrite. Default should overwrite
if not (dl_dir / "grades.html").exists():
api_target = f'{api_url}/courses/{course_view.course_id}/grades'
download_page(api_target, cookies_path, dl_dir, "grades.html")
api_target = f'{global_consts.API_URL}/courses/{course_view.course_id}/grades'
download_page(api_target, dl_dir, "grades.html")

View File

@ -1,99 +1,51 @@
import os
from http.cookiejar import MozillaCookieJar
import re
from typing import List
import canvasapi
import dateutil.parser
import requests
from bs4 import BeautifulSoup
from canvasapi.discussion_topic import DiscussionTopic
from tqdm import tqdm
from module.const import DATE_TEMPLATE, DL_LOCATION, MAX_FOLDER_NAME_SIZE
from module.helpers import make_valid_filename, shorten_file_name
from module.items import AssignmentView, AttachmentView, DiscussionView, ModuleItemView, ModuleView, PageView, SubmissionView, TopicEntryView, TopicReplyView
from module.const import global_consts
from module.items import CanvasDiscussion, CanvasPage, CanvasTopicEntry, CanvasTopicReply, CanvasModule
HTML_ITEM_ATTACHED_FILE_RE = re.compile(r'<a .*? data-api-endpoint=\"(.*?)\" .*?>')
CANVAS_API_FILE_ID_RE = re.compile(r'.*?/api/v1/courses/.*?/files/(.*?)$')
def find_course_modules(course, course_view):
modules_dir = os.path.join(DL_LOCATION, course_view.term, course_view.name, "modules")
def find_course_modules(course) -> List[CanvasModule]:
# modules_dir = os.path.join(global_consts.OUTPUT_LOCATION, course_view.term, course_view.name, "modules")
# Create modules directory if not present
if not os.path.exists(modules_dir):
os.makedirs(modules_dir)
module_views = []
results = []
try:
modules = list(course.get_modules())
for module in tqdm(modules, desc='Downloading Module Files'):
module_view = ModuleView()
module_view.id = module.id if hasattr(module, "id") else ""
module_view.name = str(module.name) if hasattr(module, "name") else ""
for module in tqdm(modules, desc='Fetching Modules'):
try:
# Get module items
module_items = module.get_module_items()
for module_item in module_items:
module_item_view = ModuleItemView()
module_item_view.id = module_item.id if hasattr(module_item, "id") else 0
module_item_view.title = str(module_item.title).replace(' ', ' ') if hasattr(module_item, "title") else ""
module_item_view.content_type = str(module_item.type) if hasattr(module_item, "type") else ""
module_item_view.url = str(module_item.html_url) if hasattr(module_item, "html_url") else ""
module_item_view.external_url = str(module_item.external_url) if hasattr(module_item, "external_url") else ""
if module_item_view.content_type == "File":
# If problems arise due to long pathnames, changing module.name to module.id might help
# A change would also have to be made in downloadCourseModulePages(api_url, course_view, cookies_path)
module_name = make_valid_filename(str(module.name))
module_name = shorten_file_name(module_name, len(module_name) - MAX_FOLDER_NAME_SIZE)
module_dir = os.path.join(modules_dir, module_name, "files")
try:
# Create directory for current module if not present
if not os.path.exists(module_dir):
os.makedirs(module_dir)
# Get the file object
module_file = course.get_file(str(module_item.content_id))
# Create path for module file download
module_file_path = os.path.join(module_dir, make_valid_filename(str(module_file.display_name)))
# Download file if it doesn't already exist
if not os.path.exists(module_file_path):
module_file.download(module_file_path)
except Exception as e:
tqdm.write(f"Skipping module file download that gave the following error: {e} - {module_item}")
module_view.items.append(module_item_view)
resolved_module = CanvasModule(module)
for item in resolved_module.items:
if item.item.type == 'Page':
page = course.get_page(item.item.page_url)
item.page = page
if hasattr(page, 'body'):
# Extract the attached files from the item's HTML.
file_matches = re.findall(HTML_ITEM_ATTACHED_FILE_RE, page.body)
for match in file_matches:
file_id = re.match(CANVAS_API_FILE_ID_RE, match)
if file_id:
try:
# Grab the metadata from the API.
canvas_file = course.get_file(file_id.group(1))
item.attached_files.add(canvas_file)
except canvasapi.exceptions.ResourceDoesNotExist:
continue
results.append(resolved_module)
except Exception as e:
tqdm.write(f"Skipping module file download that gave the following error: {e}")
module_views.append(module_view)
except Exception as e:
print("Skipping entire module that gave the following error:")
print(e)
tqdm.write(f"Skipping module file download that gave the following error: {e}")
return module_views
def get_extra_assignment_files(html, cookie_jar: MozillaCookieJar):
soup = BeautifulSoup(html, 'html.parser')
urls = [a['data-api-endpoint'] for a in soup.find_all('a', {'data-api-returntype': 'File'})]
s = requests.Session()
for cookie in cookie_jar:
s.cookies.set(cookie.name, cookie.value)
extra_files = []
for item in urls:
r = s.get(item)
if r.status_code != 200:
continue
j = r.json()
extra_files.append((j['display_name'], j['url']))
return extra_files
return results
def get_course_page_urls(course):
@ -118,18 +70,18 @@ def find_course_pages(course):
for url in tqdm(page_urls, desc='Fetching Pages'):
page = course.get_page(url)
page_view = PageView()
page_view = CanvasPage()
page_view.id = page.id if hasattr(page, "id") else 0
page_view.title = str(page.title).replace(' ', ' ') if hasattr(page, "title") else ""
page_view.body = str(page.body) if hasattr(page, "body") else ""
if hasattr(page, "created_at"):
page_view.created_date = dateutil.parser.parse(page.created_at).strftime(DATE_TEMPLATE)
page_view.created_date = dateutil.parser.parse(page.created_at).strftime(global_consts.DATE_TEMPLATE)
else:
page_view.created_date = ''
if hasattr(page, "updated_at"):
page_view.last_updated_date = dateutil.parser.parse(page.updated_at).strftime(DATE_TEMPLATE)
page_view.last_updated_date = dateutil.parser.parse(page.updated_at).strftime(global_consts.DATE_TEMPLATE)
else:
page_view.last_updated_date = ''
@ -140,83 +92,31 @@ def find_course_pages(course):
return page_views
def find_course_assignments(course, user_id):
assignment_views = []
# Get all assignments
def find_course_assignments(course):
results = []
assignments = list(course.get_assignments())
for assignment in tqdm(assignments, desc='Fetching Assignments'):
assignment_view = AssignmentView()
assignment_view.id = assignment.id if hasattr(assignment, "id") else ""
assignment_view.title = make_valid_filename(str(assignment.name).replace(' ', ' ')) if hasattr(assignment, "name") else ""
assignment_view.description = str(assignment.description) if hasattr(assignment, "description") else ""
assignment_view.assigned_date = assignment.created_at_date.strftime(DATE_TEMPLATE) if hasattr(assignment, "created_at_date") else ""
assignment_view.due_date = assignment.due_at_date.strftime(DATE_TEMPLATE) if hasattr(assignment, "due_at_date") else ""
assignment_view.html_url = assignment.html_url if hasattr(assignment, "html_url") else ""
assignment_view.ext_url = str(assignment.url) if hasattr(assignment, "url") else ""
assignment_view.updated_url = str(assignment.submissions_download_url).split("submissions?")[0] if hasattr(assignment, "submissions_download_url") else ""
# Download submission for this user only
submissions = [assignment.get_submission(user_id)]
if not len(submissions):
raise IndexError(f'No submissions found for assignment: {vars(assignment)}')
try:
for submission in submissions:
sub_view = SubmissionView()
sub_view.id = submission.id if hasattr(submission, "id") else 0
sub_view.grade = str(submission.grade) if hasattr(submission, "grade") else ""
sub_view.raw_score = str(submission.score) if hasattr(submission, "score") else ""
sub_view.total_possible_points = str(assignment.points_possible) if hasattr(assignment, "points_possible") else ""
sub_view.submission_comments = str(submission.submission_comments) if hasattr(submission, "submission_comments") else ""
sub_view.attempt = submission.attempt if hasattr(submission, "attempt") and submission.attempt is not None else 0
sub_view.user_id = str(submission.user_id) if hasattr(submission, "user_id") else ""
sub_view.preview_url = str(submission.preview_url) if hasattr(submission, "preview_url") else ""
sub_view.ext_url = str(submission.url) if hasattr(submission, "url") else ""
try:
submission.attachments
except AttributeError:
print('No attachments')
else:
for attachment in submission.attachments:
attach_view = AttachmentView()
attach_view.url = attachment.url
attach_view.id = attachment.id
attach_view.filename = attachment.filename
sub_view.attachments.append(attach_view)
assignment_view.submissions.append(sub_view)
except Exception as e:
raise
# print("Skipping submission that gave the following error:")
# print(e)
assignment_views.append(assignment_view)
return assignment_views
# Have to re-define the object because the `/api/v1/courses/:course_id/assignments` endpoint is sometimes outdated.
# The endpoint `/api/v1/courses/:course_id/assignments/:id` has the most up to date data.
assignment = course.get_assignment(assignment.id)
results.append(assignment)
return results
def find_course_announcements(course):
announcement_views = []
# try:
announcements = list(course.get_discussion_topics(only_announcements=True))
announcements: List[DiscussionTopic] = list(course.get_discussion_topics(only_announcements=True))
for announcement in tqdm(announcements, desc='Fetching Announcements'):
discussion_view = get_discussion_view(announcement)
announcement_views.append(discussion_view)
# except Exception as e:
# print("Skipping announcement that gave the following error:")
# print(e)
return announcement_views
def get_discussion_view(discussion_topic):
# Create discussion view
discussion_view = DiscussionView()
discussion_view = CanvasDiscussion(discussion_topic)
discussion_view.id = discussion_topic.id if hasattr(discussion_topic, "id") else 0
discussion_view.title = str(discussion_topic.title).replace(' ', ' ') if hasattr(discussion_topic, "title") else ""
discussion_view.author = str(discussion_topic.user_name) if hasattr(discussion_topic, "user_name") else ""
@ -236,7 +136,7 @@ def get_discussion_view(discussion_topic):
topic_entries_counter += 1
# Create new discussion view for the topic_entry
topic_entry_view = TopicEntryView()
topic_entry_view = CanvasTopicEntry()
topic_entry_view.id = topic_entry.id if hasattr(topic_entry, "id") else 0
topic_entry_view.author = str(topic_entry.user_name) if hasattr(topic_entry, "user_name") else ""
topic_entry_view.posted_date = topic_entry.created_at_date.strftime("%B %d, %Y %I:%M %p") if hasattr(topic_entry, "created_at_date") else ""
@ -248,7 +148,7 @@ def get_discussion_view(discussion_topic):
try:
for topic_reply in topic_entry_replies:
# Create new topic reply view
topic_reply_view = TopicReplyView()
topic_reply_view = CanvasTopicReply()
topic_reply_view.id = topic_reply.id if hasattr(topic_reply, "id") else 0
topic_reply_view.author = str(topic_reply.user_name) if hasattr(topic_reply, "user_name") else ""
topic_reply_view.posted_date = topic_reply.created_at_date.strftime("%B %d, %Y %I:%M %p") if hasattr(topic_reply, "created_at_date") else ""
@ -272,15 +172,8 @@ def get_discussion_view(discussion_topic):
def find_course_discussions(course):
discussion_views = []
# try:
discussion_topics = list(course.get_discussion_topics())
for discussion_topic in tqdm(discussion_topics, desc='Fetching Discussions'):
discussion_view = get_discussion_view(discussion_topic)
discussion_views.append(discussion_view)
# except Exception as e:
# print("Skipping discussion that gave the following error:")
# print(e)
return discussion_views

View File

@ -1,27 +1,64 @@
import json
from http.cookiejar import MozillaCookieJar
from typing import List, Any
import requests
from canvasapi.assignment import Assignment
from canvasapi.course import Course
from canvasapi.file import File
from canvasapi.module import ModuleItem, Module
from canvasapi.page import Page
from module.helpers import make_valid_filename
class ModuleItemView:
def __init__(self):
self.id = 0
self.title = ""
self.content_type = ""
self.url = ""
self.external_url = ""
def varsify(item) -> Any:
result = {}
try:
if isinstance(item, (str, int, float, bool)):
return item
elif isinstance(item, (list, set)):
l_result = []
for i, x in enumerate(item):
l_result.append(varsify(x))
return l_result
else:
for k, v in vars(item).items():
if isinstance(v, dict):
result[k] = varsify(v)
elif isinstance(v, list):
result[k] = []
for i, x in enumerate(v):
result[k].insert(i, varsify(x))
else:
if not k.startswith('_'):
result[k] = varsify(v)
return result
except:
return item
class ModuleView:
def __init__(self):
self.id = 0
self.name = ""
self.items = []
def jsonify_anything(item):
return json.dumps(varsify(item), indent=4, sort_keys=True, default=str)
class PageView:
class CanvasModuleItem:
def __init__(self, module_item: ModuleItem):
self.item = module_item
self.attached_files: set[File] = set()
self.page: Page
class CanvasModule:
def __init__(self, module: Module):
self.module = module
self.items: List[CanvasModuleItem] = []
for item in module.get_module_items():
i = self.module.get_module_item(item.id)
self.items.append(CanvasModuleItem(i))
class CanvasPage:
def __init__(self):
self.id = 0
self.title = ""
@ -30,7 +67,7 @@ class PageView:
self.last_updated_date = ""
class TopicReplyView:
class CanvasTopicReply:
def __init__(self):
self.id = 0
self.author = ""
@ -38,7 +75,7 @@ class TopicReplyView:
self.body = ""
class TopicEntryView:
class CanvasTopicEntry:
def __init__(self):
self.id = 0
self.author = ""
@ -47,8 +84,9 @@ class TopicEntryView:
self.topic_replies = []
class DiscussionView:
def __init__(self):
class CanvasDiscussion:
def __init__(self, discussion):
self.discussion = discussion
self.id = 0
self.title = ""
self.author = ""
@ -59,7 +97,7 @@ class DiscussionView:
self.amount_pages = 0
class SubmissionView:
class CanvasSubmission:
def __init__(self):
self.id = 0
self.attachments = []
@ -73,41 +111,25 @@ class SubmissionView:
self.ext_url = ""
class AttachmentView:
def __init__(self):
self.id = 0
self.filename = ""
self.url = ""
class AssignmentView:
def __init__(self):
self.id = 0
self.title = ""
self.description = ""
self.assigned_date = ""
self.due_date = ""
self.submissions = []
self.html_url = ""
self.ext_url = ""
self.updated_url = ""
class CourseView:
class CanvasCourse:
def __init__(self, course):
self.course: Course = course
self.course_id = course.id if hasattr(course, "id") else 0
self.term = make_valid_filename(course.term["name"] if hasattr(course, "term") and "name" in course.term.keys() else "")
self.course_code = make_valid_filename(course.course_code if hasattr(course, "course_code") else "")
self.name = course.name if hasattr(course, "name") else ""
if hasattr(course, 'original_name'):
self.name = course.original_name
else:
self.name = course.name if hasattr(course, "name") else ""
self.course_code = self.course_code.replace(' ', ' ')
self.name = self.name.replace(' ', ' ')
self.assignments = []
self.announcements = []
self.discussions = []
self.modules = []
self.assignments: List[Assignment] = []
self.announcements: List[CanvasDiscussion] = []
self.discussions: List[CanvasDiscussion] = []
self.modules: List[CanvasModule] = []
def test_course(self, base_url: str, cookie_jar: MozillaCookieJar):
s = requests.Session()

View File

@ -1,6 +1,8 @@
from pathlib import Path
from subprocess import run
from .const import global_consts
SINGLEFILE_BINARY_PATH = "./node_modules/single-file/cli/single-file"
# TODO: have this be specified by a required arg.
@ -11,7 +13,7 @@ def add_quotes(s):
return "\"" + str(s).strip("\"") + "\""
def download_page(url, cookies_path, output_path, output_name_template=""):
def download_page(url, output_path, output_name_template=""):
# TODO: we can probably safely exclude pages that match the regex r'/external_tools/retrieve\?'
if output_name_template and Path(output_path, output_name_template).exists():
@ -21,7 +23,7 @@ def download_page(url, cookies_path, output_path, output_name_template=""):
args = [
add_quotes(SINGLEFILE_BINARY_PATH),
"--browser-executable-path=" + add_quotes(CHROME_PATH.strip("\"")),
"--browser-cookies-file=" + add_quotes(cookies_path),
"--browser-cookies-file=" + add_quotes(global_consts.COOKIES_PATH),
"--output-directory=" + add_quotes(output_path),
add_quotes(url)
]

View File

@ -1,79 +1,80 @@
import os
import traceback
from pathlib import Path
from module.singlefile import download_page
from module.const import MAX_FOLDER_NAME_SIZE
from module.download import download_file
from module.get_canvas import get_extra_assignment_files
from canvasapi.assignment import Assignment
from canvasapi.course import Course
from canvasapi.submission import Submission
from module.api.file import get_embedded_files
from module.const import global_consts
from module.helpers import make_valid_filename, shorten_file_name
from module.items import CanvasModuleItem, jsonify_anything, CanvasModule
from module.singlefile import download_page
def download_module_item(module, item, modules_dir, cookies_path):
# If problems arise due to long pathnames, changing module.name to module.id might help, this can also be done with item.title
# A change would also have to be made in findCourseModules(course, course_view)
module_name = make_valid_filename(str(module.name))
module_name = shorten_file_name(module_name, len(module_name) - MAX_FOLDER_NAME_SIZE)
items_dir = os.path.join(modules_dir, module_name)
def download_module_item(course: Course, module: CanvasModule, item: CanvasModuleItem, modules_dir: Path):
try:
module_name = make_valid_filename(str(module.module.name))
module_name = shorten_file_name(module_name, len(module_name) - global_consts.MAX_FOLDER_NAME_SIZE)
module_dir = modules_dir / module_name
if item.url != "":
if not os.path.exists(items_dir):
os.makedirs(items_dir)
if not hasattr(item.item, 'url') or not item.item.url:
return
filename = make_valid_filename(str(item.title)) + ".html"
module_item_dir = os.path.join(items_dir, filename)
module_dir.mkdir(parents=True, exist_ok=True)
if item.item.type == "File":
file = course.get_file(item.item.content_id)
module_file_path = module_dir / make_valid_filename(str(file.display_name))
file.download(module_file_path)
else:
# It's a page, so download the attached files.
for file in item.attached_files:
file.download(module_dir / file.filename)
# Download the module page.
if not os.path.exists(module_item_dir):
download_page(item.url, cookies_path, items_dir, filename)
html_filename = make_valid_filename(str(item.item.title)) + ".html"
download_page(item.item.html_url, module_dir, html_filename)
except:
# TODO: wrap all threaded funcs in this try/catch
traceback.print_exc()
def download_assignment(cookies_path, cookie_jar, base_assign_dir, assignment):
assignment_title = make_valid_filename(str(assignment.title))
assignment_title = shorten_file_name(assignment_title, len(assignment_title) - MAX_FOLDER_NAME_SIZE)
assign_dir = os.path.join(base_assign_dir, assignment_title)
def download_assignment(base_assign_dir: Path, course: Course, assignment: Assignment):
try:
assignment_title = make_valid_filename(str(assignment.name))
assignment_title = shorten_file_name(assignment_title, len(assignment_title) - global_consts.MAX_FOLDER_NAME_SIZE)
assign_dir = Path(base_assign_dir, assignment_title)
assign_dir.mkdir(parents=True, exist_ok=True)
if assignment.html_url != "":
if not os.path.exists(assign_dir):
os.makedirs(assign_dir)
if assignment.html_url:
download_page(assignment.html_url, assign_dir, "assignment.html")
assignment_page_path = os.path.join(assign_dir, "assignment.html")
# Download attached files.
if assignment.description:
for file in get_embedded_files(course, assignment.description):
file.download(assign_dir / file.display_name)
if not os.path.exists(assignment_page_path):
download_page(assignment.html_url, cookies_path, assign_dir, "assignment.html")
extra_files = get_extra_assignment_files(assignment.description, cookie_jar)
for name, url in extra_files:
download_file(url, Path(assign_dir, name), cookie_jar)
for submission in assignment.submissions:
download_submission(assignment, submission, assign_dir, cookies_path)
# Students cannot view their past attempts, but this logic is left if that's ever implemented in Canvas.
submissions = [assignment.get_submission(global_consts.USER_ID)]
for submission in submissions:
download_attempt(submission, assign_dir)
submission_dir = assign_dir / 'submission' / str(submission.id)
for attachment in submission.attachments:
filepath = submission_dir / attachment.display_name
if not filepath.exists():
attachment.download(filepath)
except:
traceback.print_exc()
def download_submission(assignment, submission, assign_dir, cookies_path):
submission_dir = assign_dir
if len(assignment.submissions) != 1:
submission_dir = os.path.join(assign_dir, str(submission.user_id))
if submission.preview_url != "":
if not os.path.exists(submission_dir):
os.makedirs(submission_dir)
submission_page_dir = os.path.join(submission_dir, "submission.html")
if not os.path.exists(submission_page_dir):
download_page(submission.preview_url, cookies_path, submission_dir, "submission.html")
if (submission.attempt != 1 and assignment.updated_url != "" and assignment.html_url != ""
and assignment.html_url.rstrip("/") != assignment.updated_url.rstrip("/")):
submission_dir = os.path.join(assign_dir, "attempts")
if not os.path.exists(submission_dir):
os.makedirs(submission_dir)
for i in range(submission.attempt):
filename = "attempt_" + str(i + 1) + ".html"
submission_page_attempt_dir = os.path.join(submission_dir, filename)
if not os.path.exists(submission_page_attempt_dir):
download_page(assignment.updated_url + "/history?version=" + str(i + 1), cookies_path, submission_dir, filename)
def download_attempt(submission: Submission, assign_dir: Path):
try:
submission_dir = assign_dir / 'submission' / str(submission.id)
submission_dir.mkdir(parents=True, exist_ok=True)
for file in submission.attachments:
file.download(submission_dir / file.display_name)
if submission.preview_url:
download_page(submission.preview_url, submission_dir, f'{submission.id}.html')
except:
traceback.print_exc()

View File

@ -12,8 +12,7 @@ def do_download(task):
task[0].download(task[1])
def download_user_files(canvas: canvasapi.Canvas, base_path: str):
base_path = Path(base_path)
def download_user_files(canvas: canvasapi.Canvas, base_path: Path):
user = canvas.get_current_user()
folders = []
for folder in user.get_folders():