This repository has been archived on 2024-06-09. You can view files and clone it, but cannot push or open issues or pull requests.
catspam/email-spammer.py

211 lines
8.8 KiB
Python
Executable File

#!/usr/bin/env python3
import argparse
import itertools
import logging
import os
import random
import re
import sys
import warnings
from pathlib import Path
from queue import Queue
from bs4 import BeautifulSoup
from email_validator import EmailNotValidError, validate_email
from scraper import email_spammer, log
from scraper.email_spammer.phrases import SUBJECT_PHRASES_CYCLE
from scraper.email_spammer.scrape import EMAIL_PATTERN, search_4chan
from scraper.email_spammer.send import EmailWorker
from scraper.email_spammer.templates import FAKE_LGA, NORMAL_EMAIL
from scraper.helpers import resolve_path, select_random_file
DESU_EMAIL_DOMAINS = {
'proton.me': 0,
'protonmail.com': 14,
# 'gmail.com': 13
}
logger: logging.Logger
script_dir = os.path.dirname(os.path.realpath(__file__))
def clean_email(email: str):
email = re.sub(r'\[|\]', '', email)
if re.match(EMAIL_PATTERN, email):
email = email.lower()
email = email.replace('>', '').replace('<', '').replace('[spoiler:lit]', '')
return email
return None
def test_email(email: str):
try:
emailinfo = validate_email(email, check_deliverability=True)
return emailinfo.normalized
except EmailNotValidError as e:
return False
def search_for_proton_emails(text):
soup = BeautifulSoup(text, 'html.parser')
text = soup.find_all("div", {"class": "text"})
found = set()
for item in text:
m = re.findall(r'[^ \n]*?@proton\.me', item.text)
# return found
def main():
global logger
parser = argparse.ArgumentParser(description='')
parser.add_argument('--action', choices=['scrape', 'send', 'both'], help='What to do.')
parser.add_argument('-d', '--debug', action='store_true', help='Enable debug logging.')
parser.add_argument('--pages', default=165, type=int, help='How many pages to scrape on desuarchive.org')
parser.add_argument('--workers-per-email', default=1, type=int, help='How many workers to start per sender email address (Default: 1)')
parser.add_argument('--cache', default='found-emails.txt', help='Path to the cached email file.')
parser.add_argument('--image-dir', default=None, help='Path to the directory containing the images to send.')
# parser.add_argument('--cached-only', action='store_true', help="Only do cached emails, don't search for new ones.")
# parser.add_argument('--scrape-only', action='store_true', help="Only scrape and cache emails.")
parser.add_argument('--test', '-t', action='store_true', help="Send a test email to myself,")
parser.add_argument('--desu', action='store_true', help="Scrape desuarchive.org too.")
parser.add_argument('--send-lga', action='store_true', help="Send the fake LGA email.")
parser.add_argument('--sender', help="Send from this email address.")
parser.add_argument('--target', help="Send to only this email.")
parser.add_argument('--target-count', type=int, help="Send this many emails to the target.")
args = parser.parse_args()
if args.debug:
log_level = logging.DEBUG
else:
log_level = logging.INFO
log.root_logger.init(log_level)
logger = log.root_logger.logger
cached_emails_file = resolve_path(args.cache)
cached_emails_file.touch()
image_dir: Path = None
if args.action in ['send', 'both']:
if not args.image_dir:
logger.critical('Must supply --image-dir when sending emails')
sys.exit(1)
image_dir = resolve_path(args.image_dir)
if not image_dir.is_dir():
logger.critical(f'Image dir does not exist: {image_dir}')
cached_emails = set(cached_emails_file.read_text().splitlines())
found_emails = set()
found_4chan = set()
if not args.action:
logger.critical('Must specify what to do using --action')
sys.exit(1)
logger.info(f'Using cached emails file with {len(cached_emails)} emails: {cached_emails_file}')
if args.action in ['scrape', 'both']:
logger.info('Searching the current /aicg/ thread...')
found_emails_from_4chan = search_4chan()
for email in found_emails_from_4chan:
email = clean_email(email)
if email not in cached_emails:
logger.info(f'NEW: {email}')
found_4chan.add(email)
diff_4chan = found_4chan.difference(cached_emails)
logger.info(f'Found {len(found_emails_from_4chan)} emails from the latest thread(s), added {len(diff_4chan)} new emails.')
if args.desu:
logger.info('Scraping desuarchive.org')
# Have to use selenium since CloudFlare hides email addresses in the raw HTML.
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from webdriver_manager.chrome import ChromeDriverManager
# https://github.com/SeleniumHQ/selenium/issues/9995#issuecomment-987921515
options = Options()
options.binary_location = "/usr/bin/google-chrome"
# options.headless = True
options.add_argument('--headless')
options.add_argument("--window-size=1920,1200")
with warnings.catch_warnings():
# Silence selinium complaining about binary_location like a fucking nigger faggot
warnings.filterwarnings("ignore", category=DeprecationWarning)
driver = webdriver.Chrome(ChromeDriverManager().install(), options=options)
for query, query_max_page in DESU_EMAIL_DOMAINS.items():
if query_max_page > 0:
if args.pages < query_max_page:
max_page = args.pages
else:
max_page = query_max_page
else:
max_page = args.pages
logger.info(f'Searching {max_page} pages for {query}')
for i in range(1, max_page):
url = f'https://desuarchive.org/g/search/text/%22%40{query}%22/page/{i}/'
driver.get(url)
source = driver.page_source
soup = BeautifulSoup(source, 'html.parser')
text = soup.find_all("div", {"class": "text"})
for item in text:
matches = re.findall(rf"[^ \n>(]*?(?:@{'|'.join(DESU_EMAIL_DOMAINS)})", str(item))
for m in matches:
email = clean_email(m)
if email not in cached_emails:
logger.info(f'NEW: {email}')
found_emails.add(email)
driver.quit()
diff = len(found_emails.difference(cached_emails))
logger.info(f'Found {len(found_emails)} emails, added {diff} new emails.')
emails_file_content = found_emails | cached_emails | found_4chan
sorted_file_content = sorted(emails_file_content)
with open(cached_emails_file, 'w') as file:
for email in sorted_file_content:
cleaned = clean_email(email)
if cleaned:
file.write(cleaned + '\n')
logger.debug('Wrote to cache file.')
if args.action in ['send', 'both']:
emails_file_content = found_emails | cached_emails | found_4chan
email_list = [x for x in [*emails_file_content, *emails_file_content, *emails_file_content] if x != '']
random.shuffle(email_list)
if args.test:
if args.target:
args.target = 'yourtestemail@shit.fuck'
else:
# Used to load a million email addresses to send to, but the script was modified to troll LGA and Sam Cole.
email_list = ['youremailaddress@shit.fuck']
email_queue = Queue(maxsize=0)
logger.info(f'Starting {args.workers_per_email} workers and logging into SMTP...')
for i in range(args.workers_per_email):
worker = EmailWorker(email_queue, args.sender if not args.send_lga else 'sam.cole.404media@bestmail.us') # 'lgaburner@cluemail.com')
worker.start()
dir_list = list(Path(image_dir).rglob("*.*"))
if not args.target:
logger.info(f'Sending {len(email_list)} emails...')
for to_email in email_list:
if not args.send_lga:
d = (to_email, next(SUBJECT_PHRASES_CYCLE), NORMAL_EMAIL, select_random_file(image_dir, dir_list))
else:
d = (to_email, 'Chub.ai Interview Request', FAKE_LGA, None) # 'Invite to my discord and a token to our proxy'
email_queue.put(d)
else:
logger.info(f'Sending {args.target_count} emails...')
for i in range(args.target_count):
email_queue.put((args.target, next(SUBJECT_PHRASES_CYCLE), NORMAL_EMAIL, select_random_file(image_dir, dir_list)))
email_queue.join()
if __name__ == '__main__':
main()