aoc-get/download.py

320 lines
9.0 KiB
Python

import dataclasses
import datetime
import functools
import logging
import os
import pathlib
import sys
import time
import urllib.parse
from typing import Iterable, List, Optional
import random
import bs4
import click
import coloredlogs
import dateutil.relativedelta
import requests
import requests.utils
import yaspin
LOG = logging.getLogger(__name__)
requests.utils.default_user_agent = lambda: "aoc-get by nick@ollien.com"
@dataclasses.dataclass
class PuzzleDate:
year: int
day: int
@classmethod
def get_latest(cls) -> "PuzzleDate":
now = datetime.datetime.now()
year = now.year
day = now.day
# it's not AoC time yet! The last one is gonna be christmas of last year
if now.month != 12:
year -= 1
day = 25
LOG.warning(
f"It's not Advent of Code time yet - defaulting to day {day} of the {year} event"
)
return cls(year=year, day=day)
@classmethod
def get_next(cls) -> "PuzzleDate":
now = datetime.datetime.now()
if now.month < 12:
return cls(year=now.year, day=1)
elif now.month == 12 and now.day >= 25:
return cls(year=now.year + 1, day=1)
tomorrow_now = now + dateutil.relativedelta.relativedelta(days=1)
return cls(year=tomorrow_now.year, day=tomorrow_now.day)
def get_puzzle_url(date: PuzzleDate) -> str:
DOMAIN = "https://adventofcode.com/"
return functools.reduce(
lambda total, component: urllib.parse.urljoin(
ensure_ends_with(total, "/"), component
),
(DOMAIN, str(date.year), "day", str(date.day)),
)
def get_puzzle_source(date: PuzzleDate, token: str) -> str:
url = get_puzzle_url(date)
LOG.debug(f"Fetching puzzle source from {url}")
res = requests.get(url, cookies=build_cookies(token))
res.raise_for_status()
return res.text
def build_cookies(token: str) -> dict[str, str]:
return {"session": token}
def extract_sample_inputs(puzzle_source: str) -> List[str]:
LOG.debug("Extracting puzzle inputs from puzzle source")
souped_puzzle_source = bs4.BeautifulSoup(puzzle_source, "html.parser")
sample_inputs = [
pre_tag.code.get_text()
for pre_tag in souped_puzzle_source.find_all("pre")
if pre_tag.code
]
return sample_inputs
def ensure_ends_with(s: str, c: str) -> str:
if not s.endswith(c):
return s + c
else:
return s
def find_sample_inputs(date: PuzzleDate, token: str) -> List[str]:
LOG.debug("Downloading sample inputs")
puzzle_source = get_puzzle_source(date, token)
sample_inputs = extract_sample_inputs(puzzle_source)
LOG.info(f"Found {len(sample_inputs)} sample input(s)")
return sample_inputs
def interactively_filter_sample_inputs(sample_inputs: List[str]) -> Iterable[str]:
def prompt_for_choice():
while True:
choice = input("Download as sample input? [Y/n] ")
if choice.lower() == "y" or not choice:
return True
elif choice.lower() == "n":
return False
for sample_input in sample_inputs:
print(sample_input, end="\n\n")
should_use = prompt_for_choice()
if should_use:
yield sample_input
def save_sample_inputs(sample_inputs: Iterable[str], output_dir: pathlib.Path):
num_inputs = 0
for i, sample_input in enumerate(sample_inputs, start=1):
filename = f"sample-{i}.txt"
path = output_dir / filename
data_to_write = ensure_ends_with(sample_input, os.linesep)
with open(path, "w") as sample_file:
sample_file.write(data_to_write)
LOG.debug(f"Wrote sample input {i} to {path}")
num_inputs += 1
LOG.info(f"Downloaded {num_inputs} sample input(s) successfully")
def download_sample_inputs(
date: PuzzleDate, token: str, output_dir: pathlib.Path, *, interactive
):
inputs = find_sample_inputs(date, token)
if interactive:
inputs = interactively_filter_sample_inputs(inputs)
save_sample_inputs(inputs, output_dir)
def fetch_input(date: PuzzleDate, token: str) -> str:
puzzle_url = get_puzzle_url(date)
input_url = urllib.parse.urljoin(ensure_ends_with(puzzle_url, "/"), "input")
LOG.debug(f"Fetching puzzle input from {input_url}")
res = requests.get(input_url, cookies=build_cookies(token))
res.raise_for_status()
return res.text
def download_input(date: PuzzleDate, token: str, output_dir: pathlib.Path):
LOG.debug("Downloading puzzle input")
puzzle_input = fetch_input(date, token)
data_to_write = ensure_ends_with(puzzle_input, os.linesep)
path = output_dir / "input.txt"
with open(path, "w") as input_file:
input_file.write(data_to_write)
LOG.debug(f"Wrote puzzle input to {path}")
LOG.info("Download puzzle inputs successfully")
def get_token_from_file(path: pathlib.Path) -> str:
with open(path) as token_file:
token = token_file.read().strip()
if token.find("\n") != -1:
raise ValueError("Token file probably does not containt token; it has newlines")
return token
def wait_for_next_puzzle():
next_date = PuzzleDate.get_next()
next_datetime = datetime.datetime(year=next_date.year, month=12, day=next_date.day)
LOG.info(
f"Next puzzle is at {next_datetime.isoformat()}. Waiting for it to be ready..."
)
time_to_wait = next_datetime.timestamp() - datetime.datetime.now().timestamp()
# Wait some extra amount of time to not be rude to eric :(
time_to_wait += random.randint(0, 1500) / 1000
LOG.debug(f"Waiting {time_to_wait} seconds before fetching")
with yaspin.yaspin(text="Waiting for the next puzzle...") as spinner:
time.sleep(time_to_wait)
spinner.ok("🎄 ")
def ensure_dir(path: pathlib.Path):
try:
os.makedirs(path)
except FileExistsError:
pass
def setup_logs(verbose: bool):
coloredlogs.install(
level=logging.DEBUG if verbose else logging.INFO,
fmt="%(asctime)s aoc-get[%(process)d] %(levelname)s %(message)s",
)
class FetchCommand(click.Command):
def invoke(self, ctx: click.Context):
year = ctx.params.get("year")
day = ctx.params.get("day")
if (year and not day) or (day and not year):
raise click.ClickException("day and year must be specified together")
return super().invoke(ctx)
@click.command(cls=FetchCommand)
@click.option("-v", "--verbose", is_flag=True)
@click.option(
"--token", "passed_token", type=str, help="The advent of code session token."
)
@click.option(
"-o",
"output_dir",
type=pathlib.Path,
help="The output location of the puzzle inputs",
default=".",
)
@click.option(
"-O",
"output_dir_make",
type=pathlib.Path,
help="The output location of the puzzle inputs. Unlike -o, this will create the directory for you",
default=None,
)
@click.option(
"--token-file",
"token_path",
type=pathlib.Path,
default="~/.local/advent-of-code/session",
help="The location of the advent of code session token. Ignored if --token is provided.",
)
@click.option("--wait", is_flag=True, help="Wait until new puzzle inputs are ready")
@click.option(
"--inputonly",
"input_only",
is_flag=True,
help="Don't attempt to fetch sample inputs",
)
@click.option(
"-i",
"--interactive",
is_flag=True,
help="Interactively select sample inputs to download",
)
@click.option("--year", type=int)
@click.option("--day", type=int)
def main(
verbose: bool,
passed_token: str,
token_path: pathlib.Path,
output_dir: pathlib.Path,
output_dir_make: Optional[pathlib.Path],
wait: bool,
input_only: bool,
interactive: bool,
day: Optional[int],
year: Optional[int],
):
token = passed_token
if not token and not token_path:
LOG.error("no token file given")
sys.exit(1)
elif not token:
expanded_token_path = pathlib.Path(os.path.expanduser(token_path))
LOG.debug(f"No token passed. Opening token from {token_path}")
try:
token = get_token_from_file(expanded_token_path)
except (FileNotFoundError, ValueError) as err:
LOG.error(f"Failed to read token file: {err}")
sys.exit(1)
setup_logs(verbose)
if wait:
wait_for_next_puzzle()
if output_dir_make:
ensure_dir(output_dir_make)
output_dir = output_dir_make
specified_date = PuzzleDate(year=year, day=day) if day and year else None
date_to_fetch = specified_date or PuzzleDate.get_latest()
step = None
try:
step = "download sample inputs"
if not input_only:
download_sample_inputs(
date_to_fetch, token, output_dir, interactive=interactive
)
step = "download input"
download_input(date_to_fetch, token, output_dir)
except requests.exceptions.HTTPError as err:
LOG.error(f"Failed to {step}: {err}")
LOG.debug(f"Response body: {err.response.text}")
sys.exit(1)
if __name__ == "__main__":
main()