#!/usr/bin/env python3 import argparse import itertools import logging import os import random import re import sys import warnings from pathlib import Path from queue import Queue from bs4 import BeautifulSoup from email_validator import EmailNotValidError, validate_email from scraper import email_spammer, log from scraper.email_spammer.phrases import SUBJECT_PHRASES_CYCLE from scraper.email_spammer.scrape import EMAIL_PATTERN, search_4chan from scraper.email_spammer.send import EmailWorker from scraper.email_spammer.templates import FAKE_LGA, NORMAL_EMAIL from scraper.helpers import resolve_path, select_random_file DESU_EMAIL_DOMAINS = { 'proton.me': 0, 'protonmail.com': 14, # 'gmail.com': 13 } logger: logging.Logger script_dir = os.path.dirname(os.path.realpath(__file__)) def clean_email(email: str): email = re.sub(r'\[|\]', '', email) if re.match(EMAIL_PATTERN, email): email = email.lower() email = email.replace('>', '').replace('<', '').replace('[spoiler:lit]', '') return email return None def test_email(email: str): try: emailinfo = validate_email(email, check_deliverability=True) return emailinfo.normalized except EmailNotValidError as e: return False def search_for_proton_emails(text): soup = BeautifulSoup(text, 'html.parser') text = soup.find_all("div", {"class": "text"}) found = set() for item in text: m = re.findall(r'[^ \n]*?@proton\.me', item.text) # return found def main(): global logger parser = argparse.ArgumentParser(description='') parser.add_argument('--action', choices=['scrape', 'send', 'both'], help='What to do.') parser.add_argument('-d', '--debug', action='store_true', help='Enable debug logging.') parser.add_argument('--pages', default=165, type=int, help='How many pages to scrape on desuarchive.org') parser.add_argument('--workers-per-email', default=1, type=int, help='How many workers to start per sender email address (Default: 1)') parser.add_argument('--cache', default='found-emails.txt', help='Path to the cached email file.') parser.add_argument('--image-dir', default=None, help='Path to the directory containing the images to send.') # parser.add_argument('--cached-only', action='store_true', help="Only do cached emails, don't search for new ones.") # parser.add_argument('--scrape-only', action='store_true', help="Only scrape and cache emails.") parser.add_argument('--test', '-t', action='store_true', help="Send a test email to myself,") parser.add_argument('--desu', action='store_true', help="Scrape desuarchive.org too.") parser.add_argument('--send-lga', action='store_true', help="Send the fake LGA email.") parser.add_argument('--sender', help="Send from this email address.") parser.add_argument('--target', help="Send to only this email.") parser.add_argument('--target-count', type=int, help="Send this many emails to the target.") args = parser.parse_args() if args.debug: log_level = logging.DEBUG else: log_level = logging.INFO log.root_logger.init(log_level) logger = log.root_logger.logger cached_emails_file = resolve_path(args.cache) cached_emails_file.touch() image_dir: Path = None if args.action in ['send', 'both']: if not args.image_dir: logger.critical('Must supply --image-dir when sending emails') sys.exit(1) image_dir = resolve_path(args.image_dir) if not image_dir.is_dir(): logger.critical(f'Image dir does not exist: {image_dir}') cached_emails = set(cached_emails_file.read_text().splitlines()) found_emails = set() found_4chan = set() if not args.action: logger.critical('Must specify what to do using --action') sys.exit(1) logger.info(f'Using cached emails file with {len(cached_emails)} emails: {cached_emails_file}') if args.action in ['scrape', 'both']: logger.info('Searching the current /aicg/ thread...') found_emails_from_4chan = search_4chan() for email in found_emails_from_4chan: email = clean_email(email) if email not in cached_emails: logger.info(f'NEW: {email}') found_4chan.add(email) diff_4chan = found_4chan.difference(cached_emails) logger.info(f'Found {len(found_emails_from_4chan)} emails from the latest thread(s), added {len(diff_4chan)} new emails.') if args.desu: logger.info('Scraping desuarchive.org') # Have to use selenium since CloudFlare hides email addresses in the raw HTML. from selenium import webdriver from selenium.webdriver.chrome.options import Options from webdriver_manager.chrome import ChromeDriverManager # https://github.com/SeleniumHQ/selenium/issues/9995#issuecomment-987921515 options = Options() options.binary_location = "/usr/bin/google-chrome" # options.headless = True options.add_argument('--headless') options.add_argument("--window-size=1920,1200") with warnings.catch_warnings(): # Silence selinium complaining about binary_location like a fucking nigger faggot warnings.filterwarnings("ignore", category=DeprecationWarning) driver = webdriver.Chrome(ChromeDriverManager().install(), options=options) for query, query_max_page in DESU_EMAIL_DOMAINS.items(): if query_max_page > 0: if args.pages < query_max_page: max_page = args.pages else: max_page = query_max_page else: max_page = args.pages logger.info(f'Searching {max_page} pages for {query}') for i in range(1, max_page): url = f'https://desuarchive.org/g/search/text/%22%40{query}%22/page/{i}/' driver.get(url) source = driver.page_source soup = BeautifulSoup(source, 'html.parser') text = soup.find_all("div", {"class": "text"}) for item in text: matches = re.findall(rf"[^ \n>(]*?(?:@{'|'.join(DESU_EMAIL_DOMAINS)})", str(item)) for m in matches: email = clean_email(m) if email not in cached_emails: logger.info(f'NEW: {email}') found_emails.add(email) driver.quit() diff = len(found_emails.difference(cached_emails)) logger.info(f'Found {len(found_emails)} emails, added {diff} new emails.') emails_file_content = found_emails | cached_emails | found_4chan sorted_file_content = sorted(emails_file_content) with open(cached_emails_file, 'w') as file: for email in sorted_file_content: cleaned = clean_email(email) if cleaned: file.write(cleaned + '\n') logger.debug('Wrote to cache file.') if args.action in ['send', 'both']: emails_file_content = found_emails | cached_emails | found_4chan email_list = [x for x in [*emails_file_content, *emails_file_content, *emails_file_content] if x != ''] random.shuffle(email_list) if args.test: if args.target: args.target = 'yourtestemail@shit.fuck' else: # Used to load a million email addresses to send to, but the script was modified to troll LGA and Sam Cole. email_list = ['youremailaddress@shit.fuck'] email_queue = Queue(maxsize=0) logger.info(f'Starting {args.workers_per_email} workers and logging into SMTP...') for i in range(args.workers_per_email): worker = EmailWorker(email_queue, args.sender if not args.send_lga else 'sam.cole.404media@bestmail.us') # 'lgaburner@cluemail.com') worker.start() dir_list = list(Path(image_dir).rglob("*.*")) if not args.target: logger.info(f'Sending {len(email_list)} emails...') for to_email in email_list: if not args.send_lga: d = (to_email, next(SUBJECT_PHRASES_CYCLE), NORMAL_EMAIL, select_random_file(image_dir, dir_list)) else: d = (to_email, 'Chub.ai Interview Request', FAKE_LGA, None) # 'Invite to my discord and a token to our proxy' email_queue.put(d) else: logger.info(f'Sending {args.target_count} emails...') for i in range(args.target_count): email_queue.put((args.target, next(SUBJECT_PHRASES_CYCLE), NORMAL_EMAIL, select_random_file(image_dir, dir_list))) email_queue.join() if __name__ == '__main__': main()