aoc-get/download.py

234 lines
6.6 KiB
Python
Raw Normal View History

2021-11-30 04:19:30 +00:00
import dataclasses
import datetime
import functools
import logging
import os
import pathlib
import sys
import time
import urllib.parse
from typing import List
import random
import bs4
import click
import coloredlogs
import dateutil.relativedelta
import requests
import yaspin
LOG = logging.getLogger(__name__)
@dataclasses.dataclass
class PuzzleDate:
year: int
day: int
@classmethod
def get_latest(cls) -> "PuzzleDate":
now = datetime.datetime.now()
year = now.year
day = now.day
# it's not AoC time yet! The last one is gonna be christmas of last year
if now.month != 12:
year -= 1
day = 25
LOG.warning(
f"It's not Advent of Code time yet - defaulting to day {day} of the {year} event"
)
return cls(year=year, day=day)
@classmethod
def get_next(cls) -> "PuzzleDate":
now = datetime.datetime.now()
if now.month < 12:
return cls(year=now.year, day=1)
elif now.month == 12 and now.day >= 25:
return cls(year=now.year + 1, day=1)
tomorrow_now = now + dateutil.relativedelta.relativedelta(days=1)
return cls(year=tomorrow_now.year, day=tomorrow_now.day)
def get_puzzle_url(date: PuzzleDate) -> str:
DOMAIN = "https://adventofcode.com/"
return functools.reduce(
lambda total, component: urllib.parse.urljoin(
ensure_ends_with(total, "/"), component
),
(DOMAIN, str(date.year), "day", str(date.day)),
)
def get_puzzle_source(date: PuzzleDate) -> str:
url = get_puzzle_url(date)
LOG.debug(f"Fetching puzzle source from {url}")
res = requests.get(url)
res.raise_for_status()
return res.text
def extract_sample_inputs(puzzle_source: str) -> List[str]:
LOG.debug("Extracting puzzle inputs from puzzle source")
souped_puzzle_source = bs4.BeautifulSoup(puzzle_source, "html.parser")
sample_inputs = [
pre_tag.code.get_text()
for pre_tag in souped_puzzle_source.find_all("pre")
if pre_tag.code
]
LOG.info(f"Found {len(sample_inputs)} sample input(s)")
return sample_inputs
def ensure_ends_with(s: str, c: str) -> str:
if not s.endswith(c):
return s + c
else:
return s
def download_sample_inputs(date: PuzzleDate, output_dir: pathlib.Path):
LOG.debug("Downloading sample inputs")
puzzle_source = get_puzzle_source(date)
sample_inputs = extract_sample_inputs(puzzle_source)
for i, sample_input in enumerate(sample_inputs, start=1):
filename = f"sample-{i}.txt"
path = output_dir / filename
data_to_write = ensure_ends_with(sample_input, os.linesep)
with open(path, "w") as sample_file:
sample_file.write(data_to_write)
LOG.debug(f"Wrote sample input {i} to {path}")
LOG.info(f"Downloaded {len(sample_inputs)} sample input(s) successfully")
def fetch_input(date: PuzzleDate, token: str) -> str:
puzzle_url = get_puzzle_url(date)
input_url = urllib.parse.urljoin(ensure_ends_with(puzzle_url, "/"), "input")
LOG.debug(f"Fetching puzzle input from {input_url}")
res = requests.get(input_url, cookies={"session": token})
res.raise_for_status()
return res.text
def download_input(date: PuzzleDate, token: str, output_dir: pathlib.Path):
LOG.debug("Downloading puzzle input")
puzzle_input = fetch_input(date, token)
data_to_write = ensure_ends_with(puzzle_input, os.linesep)
path = output_dir / "input.txt"
with open(path, "w") as input_file:
input_file.write(data_to_write)
LOG.debug(f"Wrote puzzle input to {path}")
LOG.info("Download puzzle inputs successfully")
def get_token_from_file(path: pathlib.Path) -> str:
with open(path) as token_file:
token = token_file.read().strip()
if token.find("\n") != -1:
raise ValueError("Token file probably does not containt token; it has newlines")
return token
def download_latest_inputs(output_dir: pathlib.Path, token: str):
date = PuzzleDate.get_latest()
step = None
try:
step = "download sample inputs"
download_sample_inputs(date, output_dir)
step = "download input"
download_input(date, token, output_dir)
except requests.exceptions.HTTPError as err:
LOG.error(f"Failed to {step}: {err}")
LOG.debug(f"Response body: {err.response.text}")
sys.exit(1)
def wait_for_next_puzzle():
next_date = PuzzleDate.get_next()
next_datetime = datetime.datetime(year=next_date.year, month=12, day=next_date.day)
LOG.info(
f"Next puzzle is at {next_datetime.isoformat()}. Waiting for it to be ready..."
)
time_to_wait = next_datetime.timestamp() - datetime.datetime.now().timestamp()
# Wait some extra amount of time to not be rude to eric :(
time_to_wait += random.randint(0, 1500) / 1000
LOG.debug(f"Waiting {time_to_wait} seconds before fetching")
with yaspin.yaspin(text="Waiting for the next puzzle...") as spinner:
time.sleep(time_to_wait)
spinner.ok("🎄 ")
def setup_logs(verbose: bool):
coloredlogs.install(
level=logging.DEBUG if verbose else logging.INFO,
fmt="%(asctime)s aoc-get[%(process)d] %(levelname)s %(message)s",
)
@click.command()
@click.option("-v", "--verbose", is_flag=True)
@click.option(
"--token", "passed_token", type=str, help="The advent of code session token."
)
@click.option(
"-o",
"output_dir",
type=pathlib.Path,
help="The output location of the puzzle inputs",
default=".",
)
@click.option(
"--token-file",
"token_path",
type=pathlib.Path,
default="~/.local/advent-of-code/session",
help="The location of the advent of code session token. Ignored if --token is provided.",
)
@click.option("--wait", is_flag=True, help="Wait until new puzzle inputs are ready")
def main(
verbose: bool,
passed_token: str,
token_path: pathlib.Path,
output_dir: pathlib.Path,
wait: bool,
):
PuzzleDate.get_next()
token = passed_token
if not token and not token_path:
LOG.error("no token file given")
sys.exit(1)
elif not token:
expanded_token_path = pathlib.Path(os.path.expanduser(token_path))
LOG.debug(f"No token passed. Opening token from {token_path}")
try:
token = get_token_from_file(expanded_token_path)
except (FileNotFoundError, ValueError) as err:
LOG.error(f"Failed to read token file: {err}")
sys.exit(1)
setup_logs(verbose)
if wait:
wait_for_next_puzzle()
download_latest_inputs(output_dir, token)
if __name__ == "__main__":
main()