2019-08-15 23:38:16 -06:00
from canvasapi import Canvas
import requests
import traceback
import jsonpickle
import json
import dateutil . parser
import os
import string
# Canvas API URL
API_URL = " "
# Canvas API key
API_KEY = " "
# My Canvas User ID
USER_ID = 0000000
# Directory in which to download course information to (will be created if not present)
DL_LOCATION = " ./output "
2020-07-07 22:33:49 -06:00
# List of Course IDs that should be skipped (need to be integers)
COURSES_TO_SKIP = [ 288290 , 512033 ]
2019-08-15 23:38:16 -06:00
class moduleItemView ( ) :
title = " "
content_type = " "
external_url = " "
class moduleView ( ) :
name = " "
items = [ ]
def __init__ ( self ) :
self . items = [ ]
class pageView ( ) :
title = " "
body = " "
created_date = " "
last_updated_date = " "
class topicReplyView ( ) :
author = " "
posted_date = " "
body = " "
class topicEntryView ( ) :
author = " "
posted_date = " "
body = " "
topic_replies = [ ]
def __init__ ( self ) :
self . topic_replies = [ ]
class discussionView ( ) :
title = " "
author = " "
posted_date = " "
body = " "
topic_entries = [ ]
def __init__ ( self ) :
self . topic_entries = [ ]
class submissionView ( ) :
grade = " "
raw_score = " "
total_possible_points = " "
submission_comments = " "
class assignmentView ( ) :
title = " "
description = " "
assigned_date = " "
due_date = " "
submission = None
def __init__ ( self ) :
self . submission = submissionView ( )
class courseView ( ) :
term = " "
course_code = " "
name = " "
assignments = [ ]
announcements = [ ]
discussions = [ ]
def __init__ ( self ) :
self . assignments = [ ]
self . announcements = [ ]
self . discussions = [ ]
def makeValidFilename ( input_str ) :
# Remove invalid characters
valid_chars = " -_.() %s %s " % ( string . ascii_letters , string . digits )
input_str = " " . join ( c for c in input_str if c in valid_chars )
# Remove leading and trailing whitespace
input_str = input_str . lstrip ( ) . rstrip ( )
return input_str
def findCourseModules ( course , course_view ) :
modules_dir = DL_LOCATION + " / " + course_view . term + " / " + course_view . course_code + " /modules "
# Create modules directory if not present
if not os . path . exists ( modules_dir ) :
os . makedirs ( modules_dir )
module_views = [ ]
try :
modules = course . get_modules ( )
for module in modules :
module_view = moduleView ( )
# Name
module_view . name = str ( module . name ) if hasattr ( module , " name " ) else " "
try :
# Get module items
module_items = module . get_module_items ( )
for module_item in module_items :
module_item_view = moduleItemView ( )
# Title
module_item_view . title = str ( module_item . title ) if hasattr ( module_item , " title " ) else " "
# Type
module_item_view . content_type = str ( module_item . type ) if hasattr ( module_item , " type " ) else " "
# External URL
module_item_view . external_url = str ( module_item . external_url ) if hasattr ( module_item , " external_url " ) else " "
if module_item_view . content_type == " File " :
module_dir = modules_dir + " / " + makeValidFilename ( str ( module . name ) )
try :
# Create directory for current module if not present
if not os . path . exists ( module_dir ) :
os . makedirs ( module_dir )
# Get the file object
module_file = course . get_file ( str ( module_item . content_id ) )
# Create path for module file download
module_file_path = module_dir + " / " + makeValidFilename ( str ( module_file . display_name ) )
# Download file if it doesn't already exist
if not os . path . exists ( module_file_path ) :
module_file . download ( module_file_path )
except Exception as e :
print ( " Skipping module file download that gave the following error: " )
print ( e )
module_view . items . append ( module_item_view )
except Exception as e :
print ( " Skipping module item that gave the following error: " )
print ( e )
module_views . append ( module_view )
2020-07-07 22:33:49 -06:00
2019-08-15 23:38:16 -06:00
except Exception as e :
print ( " Skipping entire module that gave the following error: " )
print ( e )
return module_views
def downloadCourseFiles ( course , course_view ) :
dl_dir = DL_LOCATION + " / " + course_view . term + " / " + course_view . course_code + " /files "
# Create directory if not present
if not os . path . exists ( dl_dir ) :
os . makedirs ( dl_dir )
try :
files = course . get_files ( )
for file in files :
dl_path = dl_dir + " / " + makeValidFilename ( str ( file . display_name ) )
# Download file if it doesn't already exist
if not os . path . exists ( dl_path ) :
2020-07-07 22:33:49 -06:00
print ( ' Downloading: {} ' . format ( dl_path ) )
2019-08-15 23:38:16 -06:00
file . download ( dl_path )
except Exception as e :
print ( " Skipping file download that gave the following error: " )
print ( e )
def getCoursePageUrls ( course ) :
page_urls = [ ]
try :
# Get all pages
pages = course . get_pages ( )
for page in pages :
if hasattr ( page , " url " ) :
page_urls . append ( str ( page . url ) )
except Exception as e :
if e . message != " Not Found " :
print ( " Skipping page that gave the following error: " )
print ( e )
2020-07-07 22:33:49 -06:00
2019-08-15 23:38:16 -06:00
return page_urls
def findCoursePages ( course ) :
page_views = [ ]
try :
# Get all page URLs
page_urls = getCoursePageUrls ( course )
for url in page_urls :
page = course . get_page ( url )
page_view = pageView ( )
# Title
page_view . title = str ( page . title ) if hasattr ( page , " title " ) else " "
# Body
page_view . body = str ( page . body ) if hasattr ( page , " body " ) else " "
# Date created
page_view . created_date = dateutil . parser . parse ( page . created_at ) . strftime ( " % B %d , % Y % I: % M % p " ) if hasattr ( page , " created_at " ) else " "
# Date last updated
page_view . last_updated_date = dateutil . parser . parse ( page . updated_at ) . strftime ( " % B %d , % Y % I: % M % p " ) if hasattr ( page , " updated_at " ) else " "
page_views . append ( page_view )
except Exception as e :
print ( " Skipping page download that gave the following error: " )
print ( e )
2020-07-07 22:33:49 -06:00
2019-08-15 23:38:16 -06:00
return page_views
def findCourseAssignments ( course ) :
assignment_views = [ ]
try :
# Get all assignments
assignments = course . get_assignments ( )
for assignment in assignments :
2020-07-07 22:33:49 -06:00
print ( assignment )
2019-08-15 23:38:16 -06:00
# Create a new assignment view
assignment_view = assignmentView ( )
# Title
assignment_view . title = str ( assignment . name ) if hasattr ( assignment , " name " ) else " "
# Description
assignment_view . description = str ( assignment . description ) if hasattr ( assignment , " description " ) else " "
# Assigned date
assignment_view . assigned_date = assignment . created_at_date . strftime ( " % B %d , % Y % I: % M % p " ) if hasattr ( assignment , " created_at_date " ) else " "
# Due date
assignment_view . due_date = assignment . due_at_date . strftime ( " % B %d , % Y % I: % M % p " ) if hasattr ( assignment , " due_at_date " ) else " "
2020-07-07 22:33:49 -06:00
# Download all submissions
try :
submissions = assignment . get_submissions ( )
except :
print ( " Got no submissions for this assignment " )
else :
print ( submissions )
for submission in submissions :
print ( submission )
try :
submission . attachments
except AttributeError :
print ( ' No attachements ' )
else :
for attachment in submission . attachments :
print ( attachment [ " url " ] )
2019-08-15 23:38:16 -06:00
# Get my user"s submission object
submission = assignment . get_submission ( USER_ID )
# Create a new submission view
assignment_view . submission = submissionView ( )
# My grade
assignment_view . submission . grade = str ( submission . grade ) if hasattr ( submission , " grade " ) else " "
# My raw score
assignment_view . submission . raw_score = str ( submission . score ) if hasattr ( submission , " score " ) else " "
# Total possible score
assignment_view . submission . total_possible_points = str ( assignment . points_possible ) if hasattr ( assignment , " points_possible " ) else " "
# Submission comments
assignment_view . submission . submission_comments = str ( submission . submission_comments ) if hasattr ( submission , " submission_comments " ) else " "
assignment_views . append ( assignment_view )
2020-07-07 22:33:49 -06:00
2019-08-15 23:38:16 -06:00
except Exception as e :
print ( " Skipping assignment that gave the following error: " )
print ( e )
return assignment_views
def findCourseAnnouncements ( course ) :
announcement_views = [ ]
try :
announcements = course . get_discussion_topics ( only_announcements = True )
for announcement in announcements :
discussion_view = getDiscussionView ( announcement )
announcement_views . append ( discussion_view )
except Exception as e :
print ( " Skipping announcement that gave the following error: " )
print ( e )
2020-07-07 22:33:49 -06:00
2019-08-15 23:38:16 -06:00
return announcement_views
def getDiscussionView ( discussion_topic ) :
# Create discussion view
discussion_view = discussionView ( )
# Title
discussion_view . title = str ( discussion_topic . title ) if hasattr ( discussion_topic , " title " ) else " "
# Author
discussion_view . author = str ( discussion_topic . user_name ) if hasattr ( discussion_topic , " user_name " ) else " "
# Posted date
discussion_view . posted_date = discussion_topic . created_at_date . strftime ( " % B %d , % Y % I: % M % p " ) if hasattr ( discussion_topic , " created_at_date " ) else " "
# Body
discussion_view . body = str ( discussion_topic . message ) if hasattr ( discussion_topic , " message " ) else " "
# Topic entries
if hasattr ( discussion_topic , " discussion_subentry_count " ) and discussion_topic . discussion_subentry_count > 0 :
# Need to get replies to entries recursively?
discussion_topic_entries = discussion_topic . get_topic_entries ( )
try :
for topic_entry in discussion_topic_entries :
# Create new discussion view for the topic_entry
topic_entry_view = topicEntryView ( )
2020-07-07 22:33:49 -06:00
2019-08-15 23:38:16 -06:00
# Author
topic_entry_view . author = str ( topic_entry . user_name ) if hasattr ( topic_entry , " user_name " ) else " "
# Posted date
topic_entry_view . posted_date = topic_entry . created_at_date . strftime ( " % B %d , % Y % I: % M % p " ) if hasattr ( topic_entry , " created_at_date " ) else " "
# Body
topic_entry_view . body = str ( topic_entry . message ) if hasattr ( topic_entry , " message " ) else " "
# Get this topic's replies
topic_entry_replies = topic_entry . get_replies ( )
try :
for topic_reply in topic_entry_replies :
# Create new topic reply view
topic_reply_view = topicReplyView ( )
# Author
topic_reply_view . author = str ( topic_reply . user_name ) if hasattr ( topic_reply , " user_name " ) else " "
# Posted Date
topic_reply_view . posted_date = topic_reply . created_at_date . strftime ( " % B %d , % Y % I: % M % p " ) if hasattr ( topic_reply , " created_at_date " ) else " "
# Body
topic_reply_view . message = str ( topic_reply . message ) if hasattr ( topic_reply , " message " ) else " "
topic_entry_view . topic_replies . append ( topic_reply_view )
except Exception as e :
print ( " Tried to enumerate discussion topic entry replies but received the following error: " )
print ( e )
discussion_view . topic_entries . append ( topic_entry_view )
except Exception as e :
print ( " Tried to enumerate discussion topic entries but received the following error: " )
print ( e )
return discussion_view
def findCourseDiscussions ( course ) :
discussion_views = [ ]
try :
discussion_topics = course . get_discussion_topics ( )
for discussion_topic in discussion_topics :
discussion_view = None
discussion_view = getDiscussionView ( discussion_topic )
discussion_views . append ( discussion_view )
except Exception as e :
print ( " Skipping discussion that gave the following error: " )
print ( e )
return discussion_views
def getCourseView ( course ) :
course_view = courseView ( )
# Course term
course_view . term = course . term [ " name " ] if hasattr ( course , " term " ) and " name " in course . term . keys ( ) else " "
# Course code
course_view . course_code = course . course_code if hasattr ( course , " course_code " ) else " "
# Course name
course_view . name = course . name if hasattr ( course , " name " ) else " "
print ( " Working on " + course_view . term + " : " + course_view . name )
# Course assignments
print ( " Getting assignments " )
course_view . assignments = findCourseAssignments ( course )
# Course announcements
print ( " Getting announcements " )
course_view . announcements = findCourseAnnouncements ( course )
# Course discussions
print ( " Getting discussions " )
course_view . discussions = findCourseDiscussions ( course )
# Course pages
print ( " Getting pages " )
course_view . pages = findCoursePages ( course )
return course_view
def exportAllCourseData ( course_view ) :
json_str = json . dumps ( json . loads ( jsonpickle . encode ( course_view , unpicklable = False ) ) , indent = 4 )
course_output_dir = DL_LOCATION + " / " + course_view . term + " / " + course_view . course_code
# Create directory if not present
if not os . path . exists ( course_output_dir ) :
os . makedirs ( course_output_dir )
course_output_path = course_output_dir + " / " + course_view . course_code + " .json "
with open ( course_output_path , " w " ) as out_file :
out_file . write ( json_str )
def main ( ) :
print ( " Welcome to the Canvas Student Data Export Tool \n " )
# Canvas API URL
print ( " We will need your organization ' s Canvas Base URL. This is probably something like https:// {schoolName} .instructure.com) " )
global API_URL
2020-07-07 22:33:49 -06:00
#API_URL = input("Enter your organization's Canvas Base URL: ")
2019-08-15 23:38:16 -06:00
# Canvas API key
print ( " \n We will need a valid API key for your user. You can generate one in Canvas once you are logged in. " )
global API_KEY
2020-07-07 22:33:49 -06:00
#API_KEY = input("Enter a valid API key for your user: ")
2019-08-15 23:38:16 -06:00
# My Canvas User ID
print ( " \n We will need your Canvas User ID. You can find this by logging in to canvas and then going to this URL in the same browser {yourCanvasBaseUrl} /api/v1/users/self " )
global USER_ID
2020-07-07 22:33:49 -06:00
#USER_ID = input("Enter your Canvas User ID: ")
2019-08-15 23:38:16 -06:00
print ( " \n Connecting to canvas \n " )
2020-07-07 22:33:49 -06:00
2019-08-15 23:38:16 -06:00
# Initialize a new Canvas object
canvas = Canvas ( API_URL , API_KEY )
print ( " Creating output directory: " + DL_LOCATION + " \n " )
# Create directory if not present
if not os . path . exists ( DL_LOCATION ) :
os . makedirs ( DL_LOCATION )
2020-07-07 22:33:49 -06:00
2019-08-15 23:38:16 -06:00
all_courses_views = [ ]
try :
print ( " Getting list of all courses \n " )
courses = canvas . get_courses ( include = " term " )
# I am not authorized to access course 1083083
skip = set ( COURSES_TO_SKIP )
for course in courses :
if course . id in skip :
continue
course_view = getCourseView ( course )
all_courses_views . append ( course_view )
print ( " Downloading all files " )
downloadCourseFiles ( course , course_view )
print ( " Getting modules and downloading module files " )
course_view . modules = findCourseModules ( course , course_view )
print ( " Exporting all course data " )
exportAllCourseData ( course_view )
except Exception as e :
print ( " Skipping entire course that gave the following error: " )
print ( e )
2020-07-07 22:33:49 -06:00
2019-08-15 23:38:16 -06:00
print ( " Exporting data from all courses combined as one file: all_output.json " )
# Awful hack to make the JSON pretty. Decode it with Python stdlib json module then re-encode with indentation
json_str = json . dumps ( json . loads ( jsonpickle . encode ( all_courses_views , unpicklable = False ) ) , indent = 4 )
all_output_path = DL_LOCATION + " /all_output.json "
with open ( all_output_path , " w " ) as out_file :
out_file . write ( json_str )
print ( " \n Process complete. All canvas data exported! " )
if __name__ == " __main__ " :
try :
main ( )
except Exception as e :
print ( " Exiting due to uncaught exception: " )
print ( e )
2020-07-07 22:33:49 -06:00
print ( traceback . format_exc ( ) )