Skip to content

Commit

Permalink
Unify most of fetch and async_fetch
Browse files Browse the repository at this point in the history
Also probably fixes a bug on windows where the tempfile was left open.
  • Loading branch information
thatch committed Feb 6, 2024
1 parent db43a05 commit a35ac70
Showing 1 changed file with 97 additions and 48 deletions.
145 changes: 97 additions & 48 deletions honesty/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
"""

import json
import logging
import os
import posixpath
import urllib.parse
from dataclasses import dataclass
from pathlib import Path
from tempfile import mkstemp
from typing import Any, Dict, Optional
Expand All @@ -30,6 +32,16 @@ def cache_dir(pkg: str) -> Path:
)
BUFFER_SIZE = 4096 * 1024 # 4M

LOG = logging.getLogger(__name__)


@dataclass
class _Prefetch:
url: str
local_path: Path
headers: Optional[Dict[str, str]]
needs_recheck: bool


class Cache:
def __init__(
Expand Down Expand Up @@ -76,13 +88,9 @@ def __init__(
sync_session.mount("https://", HTTPAdapter(pool_maxsize=100))
self.sync_session = sync_session

@ktrace("pkg", "url")
def fetch(
def _fetch_common(
self, pkg: str, url: Optional[str], filename: Optional[str] = None
) -> Path:
# This duplicates the async_fetch code but using requests, for better
# non-async support.

) -> _Prefetch:
# Because parse_index doesn't understand entities, there are some urls
# that we currently get that we shouldn't bother fetching.
if "&" in pkg or "#" in pkg:
Expand All @@ -105,8 +113,11 @@ def fetch(
output_file = output_dir / (filename or "index.html")

# Don't bother cache-freshening if a file exists under its final name.
# If client cares about checking size and hash then it can do so.
if output_file.exists() and not self._is_index_filename(filename or ""):
return output_file
return _Prefetch(
url=url, local_path=output_file, headers=None, needs_recheck=False
)

headers = {}
hdrs_file = output_dir / ((filename or "index.html") + ".hdrs")
Expand All @@ -123,30 +134,51 @@ def fetch(
# else:
# raise Exception(f"Unknown headers {hdrs!r}")

# TODO what does True mean here?
return _Prefetch(
url=url, local_path=output_file, headers=headers, needs_recheck=True
)

@ktrace("pkg", "url")
def fetch(
self, pkg: str, url: Optional[str], filename: Optional[str] = None
) -> Path:
# This duplicates the async_fetch code but using requests, for better
# non-async support.

pre = self._fetch_common(pkg, url, filename)
if not pre.needs_recheck:
LOG.debug("%s already downloaded to %s", url, pre.local_path)
return pre.local_path

# TODO reconsider timeout
with kev("get", have_headers=bool(headers), url=url):
with kev("get", have_headers=bool(pre.headers), url=pre.url):
resp = self.sync_session.get(
url, stream=True, headers=headers, timeout=None
pre.url, stream=True, headers=pre.headers, timeout=None
)

resp.raise_for_status()
if resp.status_code == 304:
assert output_file.exists()
# print("used 304 for", url)
return output_file
assert pre.local_path.exists()
LOG.debug("%s got 304, using %s", url, pre.local_path)
return pre.local_path

# TODO rethink how we write/cleanup these temp files
(fd, name) = mkstemp(
f".{os.getpid()}", prefix=(filename or "index.html"), dir=output_dir
f".{os.getpid()}",
prefix=pre.local_path.name,
dir=pre.local_path.parent,
)
f = os.fdopen(fd, "wb")
with kev("stream_body"):
for chunk in resp.iter_content(1024 * 1024):
f.write(chunk)

f.close()

# Last-writer-wins semantics, even on Windows
with kev("replace"):
os.replace(name, output_file)
os.replace(name, pre.local_path)

headers = {}
if "etag" in resp.headers:
Expand All @@ -155,9 +187,13 @@ def fetch(
headers["last-modified"] = resp.headers["last-modified"]
# Don't bother with replace here, although this should happen after the
# main file gets replaced.
hdrs_file = Path(pre.local_path.parent, pre.local_path.name + ".hdrs")
hdrs_file.write_text(json.dumps(headers))
LOG.debug(
"%s newly downloaded to %s, resp headers %s", url, pre.local_path, headers
)

return output_file
return pre.local_path

async def async_fetch(self, pkg: str, url: Optional[str]) -> Path:
"""
Expand All @@ -171,40 +207,53 @@ async def async_fetch(self, pkg: str, url: Optional[str]) -> Path:
be concurrent-safe (last one wins).
"""

# Because parse_index doesn't understand entities, there are some urls
# that we currently get that we shouldn't bother fetching.
if "&" in pkg or "#" in pkg:
raise NotImplementedError("parse_index does not handle entities yet")

pkg_url = urllib.parse.urljoin(self.index_url, f"{pkg}/")
if url is None:
url = pkg_url
else:
# pypi simple gives full urls, but if your mirror gives relative ones,
# it's relative to the package's index page (which has trailing slash)
url = urllib.parse.urljoin(pkg_url, url)

filename = posixpath.basename(url)

output_dir = self.cache_path / cache_dir(pkg)
output_dir.mkdir(parents=True, exist_ok=True)

output_file = output_dir / (filename or "index.html")
pre = self._fetch_common(pkg, url, filename=None)
if not pre.needs_recheck:
LOG.debug("%s already downloaded to %s", url, pre.local_path)
return pre.local_path

assert url
async with self.session.get(
url, raise_for_status=True, timeout=None, headers=pre.headers
) as resp:
if resp.status == 304:
assert pre.local_path.exists()
LOG.debug("%s got 304, using %s", url, pre.local_path)
return pre.local_path

(fd, name) = mkstemp(
f".{os.getpid()}",
prefix=pre.local_path.name,
dir=pre.local_path.parent,
)
f = os.fdopen(fd, "wb")
with kev("stream_body"):
async for chunk in resp.content.iter_any():
f.write(chunk)

f.close()

# Last-writer-wins semantics, even on Windows
with kev("replace"):
os.replace(name, pre.local_path)

headers = {}
if "etag" in resp.headers:
headers["etag"] = resp.headers["etag"]
elif "last-modified" in resp.headers:
headers["last-modified"] = resp.headers["last-modified"]
# Don't bother with replace here, although this should happen after the
# main file gets replaced.
hdrs_file = Path(pre.local_path.parent, pre.local_path.name + ".hdrs")
hdrs_file.write_text(json.dumps(headers))
LOG.debug(
"%s newly downloaded to %s, resp headers %s",
url,
pre.local_path,
headers,
)

if not output_file.exists() or (
self.fresh_index and self._is_index_filename(filename or "")
):
async with self.session.get(
url, raise_for_status=True, timeout=None
) as resp:
tmp = f"{output_file}.{os.getpid()}"
with open(tmp, "wb") as f:
async for chunk in resp.content.iter_any():
f.write(chunk)
# Last-writer-wins semantics, even on Windows
os.replace(tmp, output_file)

return output_file
return pre.local_path

def _is_index_filename(self, name: str) -> bool:
return name in ("", "json", "691json")
Expand Down

0 comments on commit a35ac70

Please sign in to comment.