Skip to content

Commit

Permalink
Rework sync vs async, use threads for deps
Browse files Browse the repository at this point in the history
  • Loading branch information
thatch committed Jan 24, 2024
1 parent abd4e98 commit 38b6f10
Show file tree
Hide file tree
Showing 7 changed files with 366 additions and 150 deletions.
102 changes: 95 additions & 7 deletions honesty/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,18 @@
"""

import asyncio
import json
import os
import posixpath
import urllib.parse
from pathlib import Path
from tempfile import mkstemp
from typing import Any, Dict, Optional

import aiohttp
import appdirs
from keke import kev, ktrace
from requests.sessions import Session


def cache_dir(pkg: str) -> Path:
Expand All @@ -35,6 +39,7 @@ def __init__(
json_index_url: Optional[str] = None,
fresh_index: bool = False,
aiohttp_client_session_kwargs: Optional[Dict[str, Any]] = None,
sync_session: Optional[Session] = None,
) -> None:
if not cache_dir:
cache_dir = os.environ.get("HONESTY_CACHE", DEFAULT_CACHE_DIR)
Expand Down Expand Up @@ -63,11 +68,93 @@ def __init__(
}
if aiohttp_client_session_kwargs is not None:
cskwargs.update(aiohttp_client_session_kwargs)
self.session = aiohttp.ClientSession(**cskwargs)

def fetch(self, pkg: str, url: Optional[str]) -> Path:
loop = asyncio.get_event_loop()
return loop.run_until_complete(self.async_fetch(pkg, url))
self._cskwargs = cskwargs
if sync_session is None:
sync_session = Session()
self.sync_session = sync_session

@ktrace("pkg", "url")
def fetch(
self, pkg: str, url: Optional[str], filename: Optional[str] = None
) -> Path:
# This duplicates the async_fetch code but using requests, for better
# non-async support.

# Because parse_index doesn't understand entities, there are some urls
# that we currently get that we shouldn't bother fetching.
if "&" in pkg or "#" in pkg:
raise NotImplementedError("parse_index does not handle entities yet")

pkg_url = urllib.parse.urljoin(self.index_url, f"{pkg}/")
if url is None:
url = pkg_url
else:
# pypi simple gives full urls, but if your mirror gives relative ones,
# it's relative to the package's index page (which has trailing slash)
url = urllib.parse.urljoin(pkg_url, url)

if not filename:
filename = posixpath.basename(url)

output_dir = self.cache_path / cache_dir(pkg)
output_dir.mkdir(parents=True, exist_ok=True)

output_file = output_dir / (filename or "index.html")

# Don't bother cache-freshening if a file exists under its final name.
if output_file.exists() and not self._is_index_filename(filename or ""):
return output_file

headers = {}
hdrs_file = output_dir / ((filename or "index.html") + ".hdrs")
if output_file.exists():
# If things got out of sync, we don't want to return a nonexisting
# output_file...
if hdrs_file.exists():
hdrs = json.loads(hdrs_file.read_text())
if "etag" in hdrs:
headers = {"If-None-Match": hdrs["etag"]}
elif "last-modified" in hdrs:
headers = {"If-Modified-Since": hdrs["last-modified"]}
else:
raise Exception(f"Unknown headers {hdrs!r}")

# TODO reconsider timeout
with kev("get", have_headers=bool(headers)):
resp = self.sync_session.get(
url, stream=True, headers=headers, timeout=None
)

resp.raise_for_status()
if resp.status_code == 304:
assert output_file.exists()
# print("used 304 for", url)
return output_file

# TODO rethink how we write/cleanup these temp files
(fd, name) = mkstemp(
f".{os.getpid()}", prefix=(filename or "index.html"), dir=output_dir
)
f = os.fdopen(fd, "wb")
with kev("get streaming"):
for chunk in resp.iter_content(1024 * 1024):
f.write(chunk)

# Last-writer-wins semantics, even on Windows
with kev("replace"):
os.replace(name, output_file)

headers = {}
if "etag" in resp.headers:
headers["etag"] = resp.headers["etag"]
elif "last-modified" in resp.headers:
headers["last-modified"] = resp.headers["last-modified"]
# Don't bother with replace here, although this should happen after the
# main file gets replaced.
hdrs_file.write_text(json.dumps(headers))

return output_file

async def async_fetch(self, pkg: str, url: Optional[str]) -> Path:
"""
Expand Down Expand Up @@ -102,7 +189,7 @@ async def async_fetch(self, pkg: str, url: Optional[str]) -> Path:
output_file = output_dir / (filename or "index.html")

if not output_file.exists() or (
self.fresh_index and self._is_index_filename(filename)
self.fresh_index and self._is_index_filename(filename or "")
):
async with self.session.get(
url, raise_for_status=True, timeout=None
Expand All @@ -123,10 +210,11 @@ def __enter__(self) -> "Cache":
return self

def __exit__(self, exc_type: Any, exc: Any, tb: Any) -> Any:
loop = asyncio.get_event_loop()
loop.run_until_complete(self.session.close())
# TODO what is the right return value?
return

async def __aenter__(self) -> "Cache":
self.session = aiohttp.ClientSession(**self._cskwargs)
return self

async def __aexit__(self, exc_type: Any, exc: Any, tb: Any) -> Any:
Expand Down
53 changes: 30 additions & 23 deletions honesty/cmdline.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,21 @@
from datetime import datetime, timezone
from enum import Enum, IntEnum
from pathlib import Path
from typing import Any, List, Optional, Set, Tuple
from typing import Any, IO, List, Optional, Set, Tuple

import aiohttp.client_exceptions

import click
import keke
from packaging.utils import canonicalize_name

from packaging.version import Version

from .api import async_download_many
from .archive import extract_and_get_names
from .cache import Cache
from .checker import guess_license, has_nativemodules, is_pep517, run_checker
from .deps import DepEdge, DepNode, DepWalker, print_deps, print_flat_deps
from .deps import DepWalker, is_canonical, print_deps, print_flat_deps
from .releases import async_parse_index, FileType, Package, parse_index
from .vcs import CloneAnalyzer, extract2

Expand Down Expand Up @@ -52,9 +54,16 @@ def dataclass_default(obj: Any) -> Any:


@click.group()
@click.pass_context
@click.option(
"--trace", type=click.File("w"), help="Write chrome trace to this filename"
)
@click.option("--verbose", is_flag=True, help="Verbose logging")
@click.version_option(__version__)
def cli() -> None:
pass
def cli(ctx: click.Context, trace: Optional[IO[str]], verbose: bool) -> None:
if trace:
ctx.with_resource(keke.TraceOutput(trace))
logging.basicConfig(level=logging.DEBUG if verbose else logging.WARNING)


@cli.command(help="List available archives")
Expand Down Expand Up @@ -429,15 +438,15 @@ def checkcache() -> None:
@click.option("--historical", help="yyyy-mm-dd of a historical date to simulate")
@click.option("--have", help="pkg==ver to assume already installed", multiple=True)
@click.option("--use-json", is_flag=True, default=True, show_default=True)
@click.argument("package_name")
@click.argument("reqs", nargs=-1)
def deps(
include_extras: bool,
verbose: bool,
flat: bool,
pick: bool,
python_version: str,
sys_platform: str,
package_name: str,
reqs: List[str],
historical: str,
have: List[str],
use_json: bool,
Expand All @@ -453,40 +462,38 @@ def deps(
trim_newer = None

def current_versions_callback(p: str) -> Optional[str]:
assert is_canonical(p)
for x in have:
k, _, v = x.partition("==")
if k == p:
# TODO canonicalize
if canonicalize_name(k) == p:
# TODO canonicalize earlier
return v
return None

# TODO platform option
# TODO something that understands pep 517 requirements for building
# TODO move this out of cmdline into deps.py
# TODO a way to specify that you already have certain versions of packages,
# to prefer them.

seen: Set[Tuple[str, Optional[Tuple[str, ...]], Version]] = set()
assert python_version.count(".") == 2
deptree = DepWalker(
package_name,
walker = DepWalker(
python_version,
sys_platform,
only_first=pick,
trim_newer=trim_newer,
).walk(
)
walker.enqueue(reqs)
deptree = walker.walk(
include_extras,
current_versions_callback=current_versions_callback,
use_json=use_json,
)
# TODO record constraints on DepEdge, or put in lib to avoid this nonsense
fake_root = DepNode("", version=Version("0"), deps=[DepEdge(target=deptree)])
if pick:
print(f"{deptree.name}=={deptree.version}")
elif flat:
print_flat_deps(fake_root, seen)
else:
print_deps(fake_root, seen)
with keke.kev("print"):
if pick:
# TODO this is completely wrong
print(f"{deptree.name}=={deptree.version}")
elif flat:
print_flat_deps(walker.root, seen)
else:
print_deps(walker.root, seen, walker.known_conflicts)


@cli.command(help="Guess what git rev corresponds to a release")
Expand Down
Loading

0 comments on commit 38b6f10

Please sign in to comment.