import dataclasses import datetime import functools import logging import os import pathlib import sys import time import urllib.parse from typing import List, Optional import random import bs4 import click import coloredlogs import dateutil.relativedelta import requests import requests.utils import yaspin LOG = logging.getLogger(__name__) requests.utils.default_user_agent = lambda: "aoc-get by nick@ollien.com" @dataclasses.dataclass class PuzzleDate: year: int day: int @classmethod def get_latest(cls) -> "PuzzleDate": now = datetime.datetime.now() year = now.year day = now.day # it's not AoC time yet! The last one is gonna be christmas of last year if now.month != 12: year -= 1 day = 25 LOG.warning( f"It's not Advent of Code time yet - defaulting to day {day} of the {year} event" ) return cls(year=year, day=day) @classmethod def get_next(cls) -> "PuzzleDate": now = datetime.datetime.now() if now.month < 12: return cls(year=now.year, day=1) elif now.month == 12 and now.day >= 25: return cls(year=now.year + 1, day=1) tomorrow_now = now + dateutil.relativedelta.relativedelta(days=1) return cls(year=tomorrow_now.year, day=tomorrow_now.day) def get_puzzle_url(date: PuzzleDate) -> str: DOMAIN = "https://adventofcode.com/" return functools.reduce( lambda total, component: urllib.parse.urljoin( ensure_ends_with(total, "/"), component ), (DOMAIN, str(date.year), "day", str(date.day)), ) def get_puzzle_source(date: PuzzleDate) -> str: url = get_puzzle_url(date) LOG.debug(f"Fetching puzzle source from {url}") res = requests.get(url) res.raise_for_status() return res.text def extract_sample_inputs(puzzle_source: str) -> List[str]: LOG.debug("Extracting puzzle inputs from puzzle source") souped_puzzle_source = bs4.BeautifulSoup(puzzle_source, "html.parser") sample_inputs = [ pre_tag.code.get_text() for pre_tag in souped_puzzle_source.find_all("pre") if pre_tag.code ] LOG.info(f"Found {len(sample_inputs)} sample input(s)") return sample_inputs def ensure_ends_with(s: str, c: str) -> str: if not s.endswith(c): return s + c else: return s def download_sample_inputs(date: PuzzleDate, output_dir: pathlib.Path): LOG.debug("Downloading sample inputs") puzzle_source = get_puzzle_source(date) sample_inputs = extract_sample_inputs(puzzle_source) for i, sample_input in enumerate(sample_inputs, start=1): filename = f"sample-{i}.txt" path = output_dir / filename data_to_write = ensure_ends_with(sample_input, os.linesep) with open(path, "w") as sample_file: sample_file.write(data_to_write) LOG.debug(f"Wrote sample input {i} to {path}") LOG.info(f"Downloaded {len(sample_inputs)} sample input(s) successfully") def fetch_input(date: PuzzleDate, token: str) -> str: puzzle_url = get_puzzle_url(date) input_url = urllib.parse.urljoin(ensure_ends_with(puzzle_url, "/"), "input") LOG.debug(f"Fetching puzzle input from {input_url}") res = requests.get(input_url, cookies={"session": token}) res.raise_for_status() return res.text def download_input(date: PuzzleDate, token: str, output_dir: pathlib.Path): LOG.debug("Downloading puzzle input") puzzle_input = fetch_input(date, token) data_to_write = ensure_ends_with(puzzle_input, os.linesep) path = output_dir / "input.txt" with open(path, "w") as input_file: input_file.write(data_to_write) LOG.debug(f"Wrote puzzle input to {path}") LOG.info("Download puzzle inputs successfully") def get_token_from_file(path: pathlib.Path) -> str: with open(path) as token_file: token = token_file.read().strip() if token.find("\n") != -1: raise ValueError("Token file probably does not containt token; it has newlines") return token def wait_for_next_puzzle(): next_date = PuzzleDate.get_next() next_datetime = datetime.datetime(year=next_date.year, month=12, day=next_date.day) LOG.info( f"Next puzzle is at {next_datetime.isoformat()}. Waiting for it to be ready..." ) time_to_wait = next_datetime.timestamp() - datetime.datetime.now().timestamp() # Wait some extra amount of time to not be rude to eric :( time_to_wait += random.randint(0, 1500) / 1000 LOG.debug(f"Waiting {time_to_wait} seconds before fetching") with yaspin.yaspin(text="Waiting for the next puzzle...") as spinner: time.sleep(time_to_wait) spinner.ok("🎄 ") def setup_logs(verbose: bool): coloredlogs.install( level=logging.DEBUG if verbose else logging.INFO, fmt="%(asctime)s aoc-get[%(process)d] %(levelname)s %(message)s", ) class FetchCommand(click.Command): def invoke(self, ctx: click.Context): year = ctx.params.get("year") day = ctx.params.get("day") if (year and not day) or (not day and year): raise click.ClickException("day and year must be specified together") return super().invoke(ctx) @click.command(cls=FetchCommand) @click.option("-v", "--verbose", is_flag=True) @click.option( "--token", "passed_token", type=str, help="The advent of code session token." ) @click.option( "-o", "output_dir", type=pathlib.Path, help="The output location of the puzzle inputs", default=".", ) @click.option( "--token-file", "token_path", type=pathlib.Path, default="~/.local/advent-of-code/session", help="The location of the advent of code session token. Ignored if --token is provided.", ) @click.option("--wait", is_flag=True, help="Wait until new puzzle inputs are ready") @click.option("--inputonly", "input_only", is_flag=True, help="Don't attempt to fetch sample inputs") @click.option("--year", type=int) @click.option("--day", type=int) def main( verbose: bool, passed_token: str, token_path: pathlib.Path, output_dir: pathlib.Path, wait: bool, input_only: bool, day: Optional[int], year: Optional[int], ): token = passed_token if not token and not token_path: LOG.error("no token file given") sys.exit(1) elif not token: expanded_token_path = pathlib.Path(os.path.expanduser(token_path)) LOG.debug(f"No token passed. Opening token from {token_path}") try: token = get_token_from_file(expanded_token_path) except (FileNotFoundError, ValueError) as err: LOG.error(f"Failed to read token file: {err}") sys.exit(1) setup_logs(verbose) if wait: wait_for_next_puzzle() specified_date = PuzzleDate(year=year, day=day) if day and year else None date_to_fetch = specified_date or PuzzleDate.get_latest() step = None try: if not input_only: step = "download sample inputs" download_sample_inputs(date_to_fetch, output_dir) step = "download input" download_input(date_to_fetch, token, output_dir) except requests.exceptions.HTTPError as err: LOG.error(f"Failed to {step}: {err}") LOG.debug(f"Response body: {err.response.text}") sys.exit(1) if __name__ == "__main__": main()