Added ability to download all submissiosn from students and load credentials from file #1

Merged
moorepants merged 5 commits from all-assignments into master 2020-07-13 22:40:08 -06:00
1 changed files with 257 additions and 95 deletions

352
export.py
View File

@ -1,28 +1,46 @@
from canvasapi import Canvas
import requests
import traceback
import jsonpickle
# built in
import json
import dateutil.parser
import os
import string
# Canvas API URL
API_URL = ""
# Canvas API key
API_KEY = ""
# My Canvas User ID
USER_ID = 0000000
# Directory in which to download course information to (will be created if not present)
# external
from canvasapi import Canvas
from canvasapi.exceptions import ResourceDoesNotExist
import dateutil.parser
import jsonpickle
import requests
import yaml
try:
with open("credentials.yaml", 'r') as f:
credentials = yaml.load(f)
except OSError:
# Canvas API URL
API_URL = ""
# Canvas API key
API_KEY = ""
# My Canvas User ID
USER_ID = 0000000
else:
API_URL = credentials["API_URL"]
API_KEY = credentials["API_KEY"]
USER_ID = credentials["USER_ID"]
# Directory in which to download course information to (will be created if not
# present)
DL_LOCATION = "./output"
# List of Course IDs that should be skipped
COURSES_TO_SKIP = []
# List of Course IDs that should be skipped (need to be integers)
COURSES_TO_SKIP = [288290, 512033]
DATE_TEMPLATE = "%B %d, %Y %I:%M %p"
class moduleItemView():
title = ""
content_type = ""
external_url = ""
class moduleView():
name = ""
items = []
@ -30,17 +48,20 @@ class moduleView():
def __init__(self):
self.items = []
class pageView():
title = ""
body = ""
created_date = ""
last_updated_date = ""
class topicReplyView():
author = ""
posted_date = ""
body = ""
class topicEntryView():
author = ""
posted_date = ""
@ -50,6 +71,7 @@ class topicEntryView():
def __init__(self):
self.topic_replies = []
class discussionView():
title = ""
author = ""
@ -60,11 +82,34 @@ class discussionView():
def __init__(self):
self.topic_entries = []
class submissionView():
attachments = []
grade = ""
raw_score = ""
total_possible_points = ""
submission_comments = ""
total_possible_points = ""
user_id = "no-id"
def __init__(self):
self.attachments = []
self.grade = ""
self.raw_score = ""
self.submission_comments = ""
self.total_possible_points = ""
self.user_id = None # integer
class attachmentView():
filename = ""
id = 0
url = ""
def __init__(self):
self.filename = ""
self.id = 0
self.url = ""
class assignmentView():
title = ""
@ -72,9 +117,12 @@ class assignmentView():
assigned_date = ""
due_date = ""
submission = None
submissions = []
def __init__(self):
self.submission = submissionView()
self.submissions = []
class courseView():
term = ""
@ -89,6 +137,7 @@ class courseView():
self.announcements = []
self.discussions = []
def makeValidFilename(input_str):
# Remove invalid characters
valid_chars = "-_.() %s%s" % (string.ascii_letters, string.digits)
@ -99,8 +148,10 @@ def makeValidFilename(input_str):
return input_str
def findCourseModules(course, course_view):
modules_dir = DL_LOCATION + "/" + course_view.term + "/" + course_view.course_code + "/modules"
modules_dir = os.path.join(DL_LOCATION, course_view.term,
course_view.course_code, "modules")
# Create modules directory if not present
if not os.path.exists(modules_dir):
@ -160,15 +211,17 @@ def findCourseModules(course, course_view):
print(e)
module_views.append(module_view)
except Exception as e:
print("Skipping entire module that gave the following error:")
print(e)
return module_views
def downloadCourseFiles(course, course_view):
dl_dir = DL_LOCATION + "/" + course_view.term + "/" + course_view.course_code + "/files"
dl_dir = os.path.join(DL_LOCATION, course_view.term,
course_view.course_code, "files")
# Create directory if not present
if not os.path.exists(dl_dir):
@ -178,15 +231,44 @@ def downloadCourseFiles(course, course_view):
files = course.get_files()
for file in files:
dl_path = dl_dir + "/" + makeValidFilename(str(file.display_name))
dl_path = os.path.join(dl_dir,
makeValidFilename(str(file.display_name)))
# Download file if it doesn't already exist
if not os.path.exists(dl_path):
print('Downloading: {}'.format(dl_path))
file.download(dl_path)
except Exception as e:
print("Skipping file download that gave the following error:")
print(e)
def download_submission_attachments(course, course_view):
course_dir = os.path.join(DL_LOCATION, course_view.term,
course_view.course_code)
# Create directory if not present
if not os.path.exists(course_dir):
os.makedirs(course_dir)
for assignment in course_view.assignments:
for submission in assignment.submissions:
attachment_dir = os.path.join(course_dir, assignment.title,
str(submission.user_id))
if not os.path.exists(attachment_dir):
os.makedirs(attachment_dir)
for attachment in submission.attachments:
filepath = os.path.join(attachment_dir, str(attachment.id) +
"_" + attachment.filename)
if not os.path.exists(filepath):
print('Downloading attachment: {}'.format(filepath))
r = requests.get(attachment.url, allow_redirects=True)
with open(filepath, 'wb') as f:
f.write(r.content)
else:
print('File already exists: {}'.format(filepath))
def getCoursePageUrls(course):
page_urls = []
@ -201,9 +283,10 @@ def getCoursePageUrls(course):
if e.message != "Not Found":
print("Skipping page that gave the following error:")
print(e)
return page_urls
def findCoursePages(course):
page_views = []
@ -221,40 +304,114 @@ def findCoursePages(course):
# Body
page_view.body = str(page.body) if hasattr(page, "body") else ""
# Date created
page_view.created_date = dateutil.parser.parse(page.created_at).strftime("%B %d, %Y %I:%M %p") if hasattr(page, "created_at") else ""
if hasattr(page, "created_at"):
page_view.created_date = dateutil.parser.parse(
page.created_at).strftime(DATE_TEMPLATE)
else:
page_view.created_date = ""
# Date last updated
page_view.last_updated_date = dateutil.parser.parse(page.updated_at).strftime("%B %d, %Y %I:%M %p") if hasattr(page, "updated_at") else ""
if hasattr(page, "updated_at"):
page_view.last_updated_date = dateutil.parser.parse(
page.updated_at).strftime(DATE_TEMPLATE)
else:
page_view.last_updated_date = ""
page_views.append(page_view)
except Exception as e:
print("Skipping page download that gave the following error:")
print(e)
return page_views
def findCourseAssignments(course):
assignment_views = []
try:
# Get all assignments
assignments = course.get_assignments()
# Get all assignments
assignments = course.get_assignments()
for assignment in assignments:
# Create a new assignment view
assignment_view = assignmentView()
for assignment in assignments:
# Create a new assignment view
assignment_view = assignmentView()
# Title
assignment_view.title = str(assignment.name) if hasattr(assignment, "name") else ""
# Description
assignment_view.description = str(assignment.description) if hasattr(assignment, "description") else ""
# Assigned date
assignment_view.assigned_date = assignment.created_at_date.strftime("%B %d, %Y %I:%M %p") if hasattr(assignment, "created_at_date") else ""
# Due date
assignment_view.due_date = assignment.due_at_date.strftime("%B %d, %Y %I:%M %p") if hasattr(assignment, "due_at_date") else ""
# Title
if hasattr(assignment, "name"):
assignment_view.title = str(assignment.name)
else:
assignment_view.title = ""
# Description
if hasattr(assignment, "description"):
assignment_view.description = str(assignment.description)
else:
assignment_view.description = ""
# Assigned date
if hasattr(assignment, "created_at_date"):
assignment_view.assigned_date = assignment.created_at_date.strftime(DATE_TEMPLATE)
else:
assignment_view.assigned_date = ""
# Due date
if hasattr(assignment, "due_at_date"):
assignment_view.due_date = assignment.due_at_date.strftime(DATE_TEMPLATE)
else:
assignment_view.due_date = ""
# Get my user"s submission object
# Download all submissions
try:
submissions = assignment.get_submissions()
# TODO : Figure out the exact error raised
except:
print("Got no submissions for this assignment")
else:
for submission in submissions:
sub_view = submissionView()
# My grade
if hasattr(submission, "grade"):
sub_view.grade = str(submission.grade)
else:
sub_view.grade = ""
# My raw score
if hasattr(submission, "score"):
sub_view.raw_score = str(submission.score)
else:
sub_view.raw_score = ""
# Total possible score
if hasattr(assignment, "points_possible"):
sub_view.total_possible_points = str(assignment.points_possible)
else:
sub_view.total_possible_points = ""
# Submission comments
if hasattr(submission, "submission_comments"):
sub_view.submission_comments = str(submission.submission_comments)
else:
sub_view.submission_comments = ""
if hasattr(submission, "user_id"):
sub_view.user_id = str(submission.user_id)
else:
sub_view.user_id = "no-id"
try:
submission.attachments
except AttributeError:
print('No attachments')
else:
for attachment in submission.attachments:
attach_view = attachmentView()
attach_view.url = attachment["url"]
attach_view.id = attachment["id"]
attach_view.filename = attachment["filename"]
sub_view.attachments.append(attach_view)
assignment_view.submissions.append(sub_view)
# The following is only useful if you are a student in the class.
# Get my user"s submission object
try:
submission = assignment.get_submission(USER_ID)
except ResourceDoesNotExist:
print('No submission for user: {}'.format(USER_ID))
else:
# Create a new submission view
assignment_view.submission = submissionView()
@ -267,13 +424,12 @@ def findCourseAssignments(course):
# Submission comments
assignment_view.submission.submission_comments = str(submission.submission_comments) if hasattr(submission, "submission_comments") else ""
assignment_views.append(assignment_view)
except Exception as e:
print("Skipping assignment that gave the following error:")
print(e)
assignment_views.append(assignment_view)
return assignment_views
def findCourseAnnouncements(course):
announcement_views = []
@ -287,9 +443,10 @@ def findCourseAnnouncements(course):
except Exception as e:
print("Skipping announcement that gave the following error:")
print(e)
return announcement_views
def getDiscussionView(discussion_topic):
# Create discussion view
discussion_view = discussionView()
@ -312,7 +469,7 @@ def getDiscussionView(discussion_topic):
for topic_entry in discussion_topic_entries:
# Create new discussion view for the topic_entry
topic_entry_view = topicEntryView()
# Author
topic_entry_view.author = str(topic_entry.user_name) if hasattr(topic_entry, "user_name") else ""
# Posted date
@ -347,6 +504,7 @@ def getDiscussionView(discussion_topic):
return discussion_view
def findCourseDiscussions(course):
discussion_views = []
@ -364,6 +522,7 @@ def findCourseDiscussions(course):
return discussion_views
def getCourseView(course):
course_view = courseView()
@ -396,40 +555,49 @@ def getCourseView(course):
return course_view
def exportAllCourseData(course_view):
json_str = json.dumps(json.loads(jsonpickle.encode(course_view, unpicklable = False)), indent = 4)
course_output_dir = DL_LOCATION + "/" + course_view.term + "/" + course_view.course_code
course_output_dir = os.path.join(DL_LOCATION, course_view.term,
course_view.course_code)
# Create directory if not present
if not os.path.exists(course_output_dir):
os.makedirs(course_output_dir)
course_output_path = course_output_dir + "/" + course_view.course_code + ".json"
course_output_path = os.path.join(course_output_dir,
course_view.course_code + ".json")
with open(course_output_path, "w") as out_file:
out_file.write(json_str)
def main():
if __name__ == "__main__":
print("Welcome to the Canvas Student Data Export Tool\n")
# Canvas API URL
print("We will need your organization's Canvas Base URL. This is probably something like https://{schoolName}.instructure.com)")
global API_URL
API_URL = input("Enter your organization's Canvas Base URL: ")
if API_URL == "":
# Canvas API URL
print("We will need your organization's Canvas Base URL. This is "
"probably something like https://{schoolName}.instructure.com)")
API_URL = input("Enter your organization's Canvas Base URL: ")
# Canvas API key
print("\nWe will need a valid API key for your user. You can generate one in Canvas once you are logged in.")
global API_KEY
API_KEY = input("Enter a valid API key for your user: ")
# My Canvas User ID
print("\nWe will need your Canvas User ID. You can find this by logging in to canvas and then going to this URL in the same browser {yourCanvasBaseUrl}/api/v1/users/self")
global USER_ID
USER_ID = input("Enter your Canvas User ID: ")
if API_KEY == "":
# Canvas API key
print("\nWe will need a valid API key for your user. You can generate "
"one in Canvas once you are logged in.")
API_KEY = input("Enter a valid API key for your user: ")
if USER_ID == 0000000:
# My Canvas User ID
print("\nWe will need your Canvas User ID. You can find this by "
"logging in to canvas and then going to this URL in the same "
"browser {yourCanvasBaseUrl}/api/v1/users/self")
USER_ID = input("Enter your Canvas User ID: ")
print("\nConnecting to canvas\n")
# Initialize a new Canvas object
canvas = Canvas(API_URL, API_KEY)
@ -437,51 +605,45 @@ def main():
# Create directory if not present
if not os.path.exists(DL_LOCATION):
os.makedirs(DL_LOCATION)
all_courses_views = []
try:
print("Getting list of all courses\n")
courses = canvas.get_courses(include="term")
print("Getting list of all courses\n")
courses = canvas.get_courses(include="term")
# I am not authorized to access course 1083083
skip = set(COURSES_TO_SKIP)
skip = set(COURSES_TO_SKIP)
for course in courses:
if course.id in skip:
continue
for course in courses:
if course.id in skip:
continue
course_view = getCourseView(course)
course_view = getCourseView(course)
all_courses_views.append(course_view)
all_courses_views.append(course_view)
print(" Downloading all files")
downloadCourseFiles(course, course_view)
print(" Downloading all files")
downloadCourseFiles(course, course_view)
print(" Getting modules and downloading module files")
course_view.modules = findCourseModules(course, course_view)
print(" Downloading submission attachments")
download_submission_attachments(course, course_view)
print(" Exporting all course data")
exportAllCourseData(course_view)
except Exception as e:
print("Skipping entire course that gave the following error:")
print(e)
print("Exporting data from all courses combined as one file: all_output.json")
# Awful hack to make the JSON pretty. Decode it with Python stdlib json module then re-encode with indentation
json_str = json.dumps(json.loads(jsonpickle.encode(all_courses_views, unpicklable = False)), indent = 4)
print(" Getting modules and downloading module files")
course_view.modules = findCourseModules(course, course_view)
all_output_path = DL_LOCATION + "/all_output.json"
print(" Exporting all course data")
exportAllCourseData(course_view)
print("Exporting data from all courses combined as one file: "
"all_output.json")
# Awful hack to make the JSON pretty. Decode it with Python stdlib json
# module then re-encode with indentation
json_str = json.dumps(json.loads(jsonpickle.encode(all_courses_views,
unpicklable=False)),
indent=4)
all_output_path = os.path.join(DL_LOCATION, "all_output.json")
with open(all_output_path, "w") as out_file:
out_file.write(json_str)
print("\nProcess complete. All canvas data exported!")
if __name__ == "__main__":
try:
main()
except Exception as e:
print("Exiting due to uncaught exception:")
print(e)
print(traceback.format_exc())