This repository has been archived on 2023-06-06. You can view files and clone it, but cannot push or open issues or pull requests.
reeere/board-botter.py

152 lines
6.5 KiB
Python
Executable File

import argparse
import os
import time
from random import randrange
from reeere.ai import clean_reply, do_generate
from reeere.board import get_board_info, get_thread_texts, get_threads
from reeere.fourchan import fetch_and_sort_threads, fetch_thread_by_id
from reeere.post import create_new_thread, download_file, post_data
seen_posts = set()
our_posts = set()
# TODO: reply to random post rather than the latest one
# TODO: randomly decide to post an image
def main():
parser = argparse.ArgumentParser(description='Bot a userboard')
parser.add_argument('board_link', help='ID of the board to bot.')
parser.add_argument('--reply-all-start', action='store_true', help='Reply to all threads on script start.')
parser.add_argument('--roll-random-self-reply', required=False, help='Randomly decide to reply to our own posts. This must be a number. A dice is rolled this number big to decide.')
parser.add_argument('--random-reply-op', action='store_true', help='Randomly decide to reply to the OP instead of a poster.')
args = parser.parse_args()
if args.reply_all_start:
start_timestamp = 0
else:
start_timestamp = time.time()
print('Target board:', args.board_link)
board_info = get_board_info(args.board_link)
if not board_info:
print('Board not found:', args.board_link)
quit(1)
while True:
print('=================================')
print('Scanning for new posts...')
threads = get_threads(args.board_link)
for thread in threads:
# Threads without any replies
if not len(thread['replies']) and thread['date'] > start_timestamp and thread['id'] not in seen_posts:
if thread['id'] in our_posts and args.roll_random_self_reply:
chance = randrange(args.roll_random_self_reply)
if chance != args.roll_random_self_reply - 1:
print('Not replying to our own thread', thread['id'], f'{chance}/{args.roll_random_self_reply}')
continue
else:
print('Replying to our thread:', thread['id'])
print('Replying to thread', thread['id'])
print('User posted:\n', clean_reply(thread['text']))
print('\n====')
print('We will post:\n')
context = [clean_reply(thread['text'])]
our_reply = do_generate(context, thread['id'])
if not our_reply:
print('Failed to generate a reply, AI model was shit.')
continue
print(our_reply)
print('\n====\nPosting:')
time.sleep(30)
r = post_data(our_reply, thread['id'], args.board_link)
print(r.status_code, r.text)
seen_posts.add(r.json()['post']['id'])
our_posts.add(thread['id'])
time.sleep(60)
print('=================================')
else:
reply = thread['replies'][-1]
if reply['date'] > start_timestamp and reply['id'] not in seen_posts:
if thread['id'] in our_posts and args.roll_random_self_reply:
chance = randrange(args.roll_random_self_reply)
if chance != args.roll_random_self_reply - 1:
print('Not replying to our own post', thread['id'], f'{chance}/{args.roll_random_self_reply}')
continue
else:
print('Replying to our post:', thread['id'])
if randrange(2) == 1:
print('Replying to OP instead of poster')
context = thread['text']
print('Replying to OP', thread['id'])
print('User posted:\n', clean_reply(thread['text']))
print('\n====')
print('We will post:\n')
item_id = thread['id']
else:
print('Replying to post', reply['id'], 'in thread', thread['id'])
print('User posted:\n', reply['text'])
print('\n====')
print('We will post:\n')
context = get_thread_texts(thread)
item_id = reply['id']
our_reply = do_generate(context, item_id)
if not our_reply:
print('Failed to generate a reply, AI model was shit.')
continue
print(our_reply)
print('\n====\nPosting:')
time.sleep(30)
r = post_data(our_reply, thread['id'], args.board_link)
print(r.status_code, r.text)
seen_posts.add(r.json()['post']['id'])
time.sleep(60)
print('=================================')
if args.reply_all_start:
start_timestamp = time.time()
time.sleep(60)
new_thread_roll = randrange(5)
if new_thread_roll == 1:
print('Creating new thread:')
threads = fetch_and_sort_threads('pol')
pol_comment_text = None
pol_comment = None
for i in range(10):
pol_thread = threads[i]
pol_thread_replies = fetch_thread_by_id('pol', pol_thread['no'])
pol_comment = pol_thread_replies[0]
if 'com' not in pol_comment.keys():
pol_comment_text = None
print('Skipping pol thread', pol_thread['no'])
continue
pol_comment_text = clean_reply(pol_comment['com'])
if '>>' in pol_comment_text:
pol_comment_text = None
print('Skipping pol thread', pol_thread['no'])
continue
else:
break
if not pol_comment_text:
print('Failed to find a suitable pol thread')
break
print(pol_comment_text)
thread_image = download_file(pol_comment['image_url'])
time.sleep(30)
post_request = create_new_thread(thread_image, args.board_link, pol_comment_text)
print(post_request.status_code, post_request.text)
os.remove(thread_image)
time.sleep(90)
time.sleep(60)
if __name__ == "__main__":
main()