aoc-get/download.py

245 lines
7.1 KiB
Python

import dataclasses
import datetime
import functools
import logging
import os
import pathlib
import sys
import time
import urllib.parse
from typing import List, Optional
import random
import bs4
import click
import coloredlogs
import dateutil.relativedelta
import requests
import requests.utils
import yaspin
LOG = logging.getLogger(__name__)
requests.utils.default_user_agent = lambda: "aoc-get by nick@ollien.com"
@dataclasses.dataclass
class PuzzleDate:
year: int
day: int
@classmethod
def get_latest(cls) -> "PuzzleDate":
now = datetime.datetime.now()
year = now.year
day = now.day
# it's not AoC time yet! The last one is gonna be christmas of last year
if now.month != 12:
year -= 1
day = 25
LOG.warning(
f"It's not Advent of Code time yet - defaulting to day {day} of the {year} event"
)
return cls(year=year, day=day)
@classmethod
def get_next(cls) -> "PuzzleDate":
now = datetime.datetime.now()
if now.month < 12:
return cls(year=now.year, day=1)
elif now.month == 12 and now.day >= 25:
return cls(year=now.year + 1, day=1)
tomorrow_now = now + dateutil.relativedelta.relativedelta(days=1)
return cls(year=tomorrow_now.year, day=tomorrow_now.day)
def get_puzzle_url(date: PuzzleDate) -> str:
DOMAIN = "https://adventofcode.com/"
return functools.reduce(
lambda total, component: urllib.parse.urljoin(
ensure_ends_with(total, "/"), component
),
(DOMAIN, str(date.year), "day", str(date.day)),
)
def get_puzzle_source(date: PuzzleDate) -> str:
url = get_puzzle_url(date)
LOG.debug(f"Fetching puzzle source from {url}")
res = requests.get(url)
res.raise_for_status()
return res.text
def extract_sample_inputs(puzzle_source: str) -> List[str]:
LOG.debug("Extracting puzzle inputs from puzzle source")
souped_puzzle_source = bs4.BeautifulSoup(puzzle_source, "html.parser")
sample_inputs = [
pre_tag.code.get_text()
for pre_tag in souped_puzzle_source.find_all("pre")
if pre_tag.code
]
LOG.info(f"Found {len(sample_inputs)} sample input(s)")
return sample_inputs
def ensure_ends_with(s: str, c: str) -> str:
if not s.endswith(c):
return s + c
else:
return s
def download_sample_inputs(date: PuzzleDate, output_dir: pathlib.Path):
LOG.debug("Downloading sample inputs")
puzzle_source = get_puzzle_source(date)
sample_inputs = extract_sample_inputs(puzzle_source)
for i, sample_input in enumerate(sample_inputs, start=1):
filename = f"sample-{i}.txt"
path = output_dir / filename
data_to_write = ensure_ends_with(sample_input, os.linesep)
with open(path, "w") as sample_file:
sample_file.write(data_to_write)
LOG.debug(f"Wrote sample input {i} to {path}")
LOG.info(f"Downloaded {len(sample_inputs)} sample input(s) successfully")
def fetch_input(date: PuzzleDate, token: str) -> str:
puzzle_url = get_puzzle_url(date)
input_url = urllib.parse.urljoin(ensure_ends_with(puzzle_url, "/"), "input")
LOG.debug(f"Fetching puzzle input from {input_url}")
res = requests.get(input_url, cookies={"session": token})
res.raise_for_status()
return res.text
def download_input(date: PuzzleDate, token: str, output_dir: pathlib.Path):
LOG.debug("Downloading puzzle input")
puzzle_input = fetch_input(date, token)
data_to_write = ensure_ends_with(puzzle_input, os.linesep)
path = output_dir / "input.txt"
with open(path, "w") as input_file:
input_file.write(data_to_write)
LOG.debug(f"Wrote puzzle input to {path}")
LOG.info("Download puzzle inputs successfully")
def get_token_from_file(path: pathlib.Path) -> str:
with open(path) as token_file:
token = token_file.read().strip()
if token.find("\n") != -1:
raise ValueError("Token file probably does not containt token; it has newlines")
return token
def wait_for_next_puzzle():
next_date = PuzzleDate.get_next()
next_datetime = datetime.datetime(year=next_date.year, month=12, day=next_date.day)
LOG.info(
f"Next puzzle is at {next_datetime.isoformat()}. Waiting for it to be ready..."
)
time_to_wait = next_datetime.timestamp() - datetime.datetime.now().timestamp()
# Wait some extra amount of time to not be rude to eric :(
time_to_wait += random.randint(0, 1500) / 1000
LOG.debug(f"Waiting {time_to_wait} seconds before fetching")
with yaspin.yaspin(text="Waiting for the next puzzle...") as spinner:
time.sleep(time_to_wait)
spinner.ok("🎄 ")
def setup_logs(verbose: bool):
coloredlogs.install(
level=logging.DEBUG if verbose else logging.INFO,
fmt="%(asctime)s aoc-get[%(process)d] %(levelname)s %(message)s",
)
class FetchCommand(click.Command):
def invoke(self, ctx: click.Context):
year = ctx.params.get("year")
day = ctx.params.get("day")
if (year and not day) or (not day and year):
raise click.ClickException("day and year must be specified together")
return super().invoke(ctx)
@click.command(cls=FetchCommand)
@click.option("-v", "--verbose", is_flag=True)
@click.option(
"--token", "passed_token", type=str, help="The advent of code session token."
)
@click.option(
"-o",
"output_dir",
type=pathlib.Path,
help="The output location of the puzzle inputs",
default=".",
)
@click.option(
"--token-file",
"token_path",
type=pathlib.Path,
default="~/.local/advent-of-code/session",
help="The location of the advent of code session token. Ignored if --token is provided.",
)
@click.option("--wait", is_flag=True, help="Wait until new puzzle inputs are ready")
@click.option("--year", type=int)
@click.option("--day", type=int)
def main(
verbose: bool,
passed_token: str,
token_path: pathlib.Path,
output_dir: pathlib.Path,
wait: bool,
day: Optional[int],
year: Optional[int],
):
token = passed_token
if not token and not token_path:
LOG.error("no token file given")
sys.exit(1)
elif not token:
expanded_token_path = pathlib.Path(os.path.expanduser(token_path))
LOG.debug(f"No token passed. Opening token from {token_path}")
try:
token = get_token_from_file(expanded_token_path)
except (FileNotFoundError, ValueError) as err:
LOG.error(f"Failed to read token file: {err}")
sys.exit(1)
setup_logs(verbose)
if wait:
wait_for_next_puzzle()
specified_date = PuzzleDate(year=year, day=day) if day and year else None
date_to_fetch = specified_date or PuzzleDate.get_latest()
step = None
try:
step = "download sample inputs"
download_sample_inputs(date_to_fetch, output_dir)
step = "download input"
download_input(date_to_fetch, token, output_dir)
except requests.exceptions.HTTPError as err:
LOG.error(f"Failed to {step}: {err}")
LOG.debug(f"Response body: {err.response.text}")
sys.exit(1)
if __name__ == "__main__":
main()