320 lines
9.0 KiB
Python
320 lines
9.0 KiB
Python
import dataclasses
|
|
import datetime
|
|
import functools
|
|
import logging
|
|
import os
|
|
import pathlib
|
|
import sys
|
|
import time
|
|
import urllib.parse
|
|
from typing import Iterable, List, Optional
|
|
import random
|
|
|
|
import bs4
|
|
import click
|
|
import coloredlogs
|
|
import dateutil.relativedelta
|
|
import requests
|
|
import requests.utils
|
|
import yaspin
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
requests.utils.default_user_agent = lambda: "aoc-get by nick@ollien.com"
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class PuzzleDate:
|
|
year: int
|
|
day: int
|
|
|
|
@classmethod
|
|
def get_latest(cls) -> "PuzzleDate":
|
|
now = datetime.datetime.now()
|
|
year = now.year
|
|
day = now.day
|
|
|
|
# it's not AoC time yet! The last one is gonna be christmas of last year
|
|
if now.month != 12:
|
|
year -= 1
|
|
day = 25
|
|
LOG.warning(
|
|
f"It's not Advent of Code time yet - defaulting to day {day} of the {year} event"
|
|
)
|
|
|
|
return cls(year=year, day=day)
|
|
|
|
@classmethod
|
|
def get_next(cls) -> "PuzzleDate":
|
|
now = datetime.datetime.now()
|
|
if now.month < 12:
|
|
return cls(year=now.year, day=1)
|
|
elif now.month == 12 and now.day >= 25:
|
|
return cls(year=now.year + 1, day=1)
|
|
|
|
tomorrow_now = now + dateutil.relativedelta.relativedelta(days=1)
|
|
return cls(year=tomorrow_now.year, day=tomorrow_now.day)
|
|
|
|
|
|
def get_puzzle_url(date: PuzzleDate) -> str:
|
|
DOMAIN = "https://adventofcode.com/"
|
|
|
|
return functools.reduce(
|
|
lambda total, component: urllib.parse.urljoin(
|
|
ensure_ends_with(total, "/"), component
|
|
),
|
|
(DOMAIN, str(date.year), "day", str(date.day)),
|
|
)
|
|
|
|
|
|
def get_puzzle_source(date: PuzzleDate, token: str) -> str:
|
|
url = get_puzzle_url(date)
|
|
LOG.debug(f"Fetching puzzle source from {url}")
|
|
|
|
res = requests.get(url, cookies=build_cookies(token))
|
|
res.raise_for_status()
|
|
|
|
return res.text
|
|
|
|
|
|
def build_cookies(token: str) -> dict[str, str]:
|
|
return {"session": token}
|
|
|
|
|
|
def extract_sample_inputs(puzzle_source: str) -> List[str]:
|
|
LOG.debug("Extracting puzzle inputs from puzzle source")
|
|
souped_puzzle_source = bs4.BeautifulSoup(puzzle_source, "html.parser")
|
|
sample_inputs = [
|
|
pre_tag.code.get_text()
|
|
for pre_tag in souped_puzzle_source.find_all("pre")
|
|
if pre_tag.code
|
|
]
|
|
|
|
return sample_inputs
|
|
|
|
|
|
def ensure_ends_with(s: str, c: str) -> str:
|
|
if not s.endswith(c):
|
|
return s + c
|
|
else:
|
|
return s
|
|
|
|
|
|
def find_sample_inputs(date: PuzzleDate, token: str) -> List[str]:
|
|
LOG.debug("Downloading sample inputs")
|
|
puzzle_source = get_puzzle_source(date, token)
|
|
sample_inputs = extract_sample_inputs(puzzle_source)
|
|
LOG.info(f"Found {len(sample_inputs)} sample input(s)")
|
|
|
|
return sample_inputs
|
|
|
|
|
|
def interactively_filter_sample_inputs(sample_inputs: List[str]) -> Iterable[str]:
|
|
def prompt_for_choice():
|
|
while True:
|
|
choice = input("Download as sample input? [Y/n] ")
|
|
if choice.lower() == "y" or not choice:
|
|
return True
|
|
elif choice.lower() == "n":
|
|
return False
|
|
|
|
for sample_input in sample_inputs:
|
|
print(sample_input, end="\n\n")
|
|
should_use = prompt_for_choice()
|
|
if should_use:
|
|
yield sample_input
|
|
|
|
|
|
def save_sample_inputs(sample_inputs: Iterable[str], output_dir: pathlib.Path):
|
|
num_inputs = 0
|
|
for i, sample_input in enumerate(sample_inputs, start=1):
|
|
filename = f"sample-{i}.txt"
|
|
path = output_dir / filename
|
|
data_to_write = ensure_ends_with(sample_input, os.linesep)
|
|
with open(path, "w") as sample_file:
|
|
sample_file.write(data_to_write)
|
|
|
|
LOG.debug(f"Wrote sample input {i} to {path}")
|
|
num_inputs += 1
|
|
|
|
LOG.info(f"Downloaded {num_inputs} sample input(s) successfully")
|
|
|
|
|
|
def download_sample_inputs(
|
|
date: PuzzleDate, token: str, output_dir: pathlib.Path, *, interactive
|
|
):
|
|
inputs = find_sample_inputs(date, token)
|
|
if interactive:
|
|
inputs = interactively_filter_sample_inputs(inputs)
|
|
|
|
save_sample_inputs(inputs, output_dir)
|
|
|
|
|
|
def fetch_input(date: PuzzleDate, token: str) -> str:
|
|
puzzle_url = get_puzzle_url(date)
|
|
input_url = urllib.parse.urljoin(ensure_ends_with(puzzle_url, "/"), "input")
|
|
LOG.debug(f"Fetching puzzle input from {input_url}")
|
|
|
|
res = requests.get(input_url, cookies=build_cookies(token))
|
|
res.raise_for_status()
|
|
return res.text
|
|
|
|
|
|
def download_input(date: PuzzleDate, token: str, output_dir: pathlib.Path):
|
|
LOG.debug("Downloading puzzle input")
|
|
puzzle_input = fetch_input(date, token)
|
|
data_to_write = ensure_ends_with(puzzle_input, os.linesep)
|
|
path = output_dir / "input.txt"
|
|
|
|
with open(path, "w") as input_file:
|
|
input_file.write(data_to_write)
|
|
|
|
LOG.debug(f"Wrote puzzle input to {path}")
|
|
LOG.info("Download puzzle inputs successfully")
|
|
|
|
|
|
def get_token_from_file(path: pathlib.Path) -> str:
|
|
with open(path) as token_file:
|
|
token = token_file.read().strip()
|
|
|
|
if token.find("\n") != -1:
|
|
raise ValueError("Token file probably does not containt token; it has newlines")
|
|
|
|
return token
|
|
|
|
|
|
def wait_for_next_puzzle():
|
|
next_date = PuzzleDate.get_next()
|
|
next_datetime = datetime.datetime(year=next_date.year, month=12, day=next_date.day)
|
|
LOG.info(
|
|
f"Next puzzle is at {next_datetime.isoformat()}. Waiting for it to be ready..."
|
|
)
|
|
|
|
time_to_wait = next_datetime.timestamp() - datetime.datetime.now().timestamp()
|
|
# Wait some extra amount of time to not be rude to eric :(
|
|
time_to_wait += random.randint(0, 1500) / 1000
|
|
|
|
LOG.debug(f"Waiting {time_to_wait} seconds before fetching")
|
|
with yaspin.yaspin(text="Waiting for the next puzzle...") as spinner:
|
|
time.sleep(time_to_wait)
|
|
spinner.ok("🎄 ")
|
|
|
|
|
|
def ensure_dir(path: pathlib.Path):
|
|
try:
|
|
os.makedirs(path)
|
|
except FileExistsError:
|
|
pass
|
|
|
|
|
|
def setup_logs(verbose: bool):
|
|
coloredlogs.install(
|
|
level=logging.DEBUG if verbose else logging.INFO,
|
|
fmt="%(asctime)s aoc-get[%(process)d] %(levelname)s %(message)s",
|
|
)
|
|
|
|
|
|
class FetchCommand(click.Command):
|
|
def invoke(self, ctx: click.Context):
|
|
year = ctx.params.get("year")
|
|
day = ctx.params.get("day")
|
|
if (year and not day) or (day and not year):
|
|
raise click.ClickException("day and year must be specified together")
|
|
|
|
return super().invoke(ctx)
|
|
|
|
|
|
@click.command(cls=FetchCommand)
|
|
@click.option("-v", "--verbose", is_flag=True)
|
|
@click.option(
|
|
"--token", "passed_token", type=str, help="The advent of code session token."
|
|
)
|
|
@click.option(
|
|
"-o",
|
|
"output_dir",
|
|
type=pathlib.Path,
|
|
help="The output location of the puzzle inputs",
|
|
default=".",
|
|
)
|
|
@click.option(
|
|
"-O",
|
|
"output_dir_make",
|
|
type=pathlib.Path,
|
|
help="The output location of the puzzle inputs. Unlike -o, this will create the directory for you",
|
|
default=None,
|
|
)
|
|
@click.option(
|
|
"--token-file",
|
|
"token_path",
|
|
type=pathlib.Path,
|
|
default="~/.local/advent-of-code/session",
|
|
help="The location of the advent of code session token. Ignored if --token is provided.",
|
|
)
|
|
@click.option("--wait", is_flag=True, help="Wait until new puzzle inputs are ready")
|
|
@click.option(
|
|
"--inputonly",
|
|
"input_only",
|
|
is_flag=True,
|
|
help="Don't attempt to fetch sample inputs",
|
|
)
|
|
@click.option(
|
|
"-i",
|
|
"--interactive",
|
|
is_flag=True,
|
|
help="Interactively select sample inputs to download",
|
|
)
|
|
@click.option("--year", type=int)
|
|
@click.option("--day", type=int)
|
|
def main(
|
|
verbose: bool,
|
|
passed_token: str,
|
|
token_path: pathlib.Path,
|
|
output_dir: pathlib.Path,
|
|
output_dir_make: Optional[pathlib.Path],
|
|
wait: bool,
|
|
input_only: bool,
|
|
interactive: bool,
|
|
day: Optional[int],
|
|
year: Optional[int],
|
|
):
|
|
token = passed_token
|
|
if not token and not token_path:
|
|
LOG.error("no token file given")
|
|
sys.exit(1)
|
|
elif not token:
|
|
expanded_token_path = pathlib.Path(os.path.expanduser(token_path))
|
|
LOG.debug(f"No token passed. Opening token from {token_path}")
|
|
try:
|
|
token = get_token_from_file(expanded_token_path)
|
|
except (FileNotFoundError, ValueError) as err:
|
|
LOG.error(f"Failed to read token file: {err}")
|
|
sys.exit(1)
|
|
|
|
setup_logs(verbose)
|
|
if wait:
|
|
wait_for_next_puzzle()
|
|
|
|
if output_dir_make:
|
|
ensure_dir(output_dir_make)
|
|
output_dir = output_dir_make
|
|
|
|
specified_date = PuzzleDate(year=year, day=day) if day and year else None
|
|
date_to_fetch = specified_date or PuzzleDate.get_latest()
|
|
step = None
|
|
try:
|
|
step = "download sample inputs"
|
|
if not input_only:
|
|
download_sample_inputs(
|
|
date_to_fetch, token, output_dir, interactive=interactive
|
|
)
|
|
step = "download input"
|
|
download_input(date_to_fetch, token, output_dir)
|
|
except requests.exceptions.HTTPError as err:
|
|
LOG.error(f"Failed to {step}: {err}")
|
|
LOG.debug(f"Response body: {err.response.text}")
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|