Skip to content

Commit

Permalink
Write JSON output to requirements.json
Browse files Browse the repository at this point in the history
  • Loading branch information
chrysle committed May 11, 2024
1 parent 2f0b266 commit 69c0cb9
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 68 deletions.
13 changes: 10 additions & 3 deletions piptools/scripts/compile.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
)
DEFAULT_REQUIREMENTS_FILE = "requirements.in"
DEFAULT_REQUIREMENTS_OUTPUT_FILE = "requirements.txt"
DEFAULT_REQUIREMENTS_OUTPUT_FILE_JSON = "requirements.json"
METADATA_FILENAMES = frozenset({"setup.py", "setup.cfg", "pyproject.toml"})


Expand Down Expand Up @@ -215,10 +216,16 @@ def cli(
# An output file must be provided for stdin
if src_files == ("-",):
raise click.BadParameter("--output-file is required if input is from stdin")
# Use default requirements output file if there is a setup.py the source file
# Use default requirements output file if the source file is a recognized
# packaging metadata file
elif os.path.basename(src_files[0]) in METADATA_FILENAMES:
file_name = os.path.join(
os.path.dirname(src_files[0]), DEFAULT_REQUIREMENTS_OUTPUT_FILE
os.path.dirname(src_files[0]),
(
DEFAULT_REQUIREMENTS_OUTPUT_FILE_JSON
if json
else DEFAULT_REQUIREMENTS_OUTPUT_FILE
),
)
# An output file must be provided if there are multiple source files
elif len(src_files) > 1:
Expand Down Expand Up @@ -297,7 +304,7 @@ def cli(
# Proxy with a LocalRequirementsRepository if --upgrade is not specified
# (= default invocation)
output_file_exists = os.path.exists(output_file.name)
if not upgrade and output_file_exists:
if not (upgrade or json) and output_file_exists:
output_file_is_empty = os.path.getsize(output_file.name) == 0
if upgrade_install_reqs and output_file_is_empty:
log.warning(
Expand Down
144 changes: 79 additions & 65 deletions piptools/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ def _iter_ireqs(
unsafe_packages: set[str],
markers: dict[str, Marker],
hashes: dict[InstallRequirement, set[str]] | None = None,
) -> Iterator[str, dict[str, str]]:
) -> Iterator[str] | Iterator[dict[str, str | list[str]]]:
# default values
unsafe_packages = unsafe_packages if self.allow_unsafe else set()
hashes = hashes or {}
Expand All @@ -194,12 +194,13 @@ def _iter_ireqs(
has_hashes = hashes and any(hash for hash in hashes.values())

yielded = False
for line in self.write_header():
yield line, {}
yielded = True
for line in self.write_flags():
yield line, {}
yielded = True
if not self.json_output:
for line in self.write_header():
yield line
yielded = True
for line in self.write_flags():
yield line
yielded = True

unsafe_requirements = unsafe_requirements or {
r for r in results if r.name in unsafe_packages
Expand All @@ -208,37 +209,39 @@ def _iter_ireqs(

if packages:
for ireq in sorted(packages, key=self._sort_key):
if has_hashes and not hashes.get(ireq):
yield MESSAGE_UNHASHED_PACKAGE, {}
if has_hashes and not hashes.get(ireq) and not self.json_output:
yield MESSAGE_UNHASHED_PACKAGE
warn_uninstallable = True
line, json = self._format_requirement(
formatted_req = self._format_requirement(
ireq, markers.get(key_from_ireq(ireq)), hashes=hashes
)
yield line, json
yield formatted_req
yielded = True

if unsafe_requirements:
yield "", {}

if not self.json_output:
yield ""
yielded = True
if has_hashes and not self.allow_unsafe:
yield MESSAGE_UNSAFE_PACKAGES_UNPINNED, {}
if has_hashes and not self.allow_unsafe and not self.json_output:
yield MESSAGE_UNSAFE_PACKAGES_UNPINNED
warn_uninstallable = True
else:
yield MESSAGE_UNSAFE_PACKAGES, {}
elif not self.json_output:
yield MESSAGE_UNSAFE_PACKAGES

for ireq in sorted(unsafe_requirements, key=self._sort_key):
ireq_key = key_from_ireq(ireq)
if not self.allow_unsafe:
yield comment(f"# {ireq_key}"), {}
if not self.allow_unsafe and not self.json_output:
yield comment(f"# {ireq_key}")
else:
line, json = self._format_requirement(
formatted_req = self._format_requirement(
ireq, marker=markers.get(ireq_key), hashes=hashes
)
yield line, json
yield formatted_req

# Yield even when there's no real content, so that blank files are written
if not yielded:
yield "", {}
yield ""

if warn_uninstallable:
log.warning(MESSAGE_UNINSTALLABLE)
Expand All @@ -252,40 +255,47 @@ def write(
hashes: dict[InstallRequirement, set[str]] | None,
) -> None:
output_structure = []
if not self.dry_run or self.json_output:
if not self.dry_run:
dst_file = io.TextIOWrapper(
self.dst_file,
encoding="utf8",
newline=self.linesep,
line_buffering=True,
)
try:
for line, ireq in self._iter_ireqs(
for formatted_req in self._iter_ireqs(
results, unsafe_requirements, unsafe_packages, markers, hashes
):
if self.dry_run:
if self.dry_run and not self.json_output:
# Bypass the log level to always print this during a dry run
log.log(line)
assert isinstance(formatted_req, str)
log.log(formatted_req)
else:
if not self.json_output:
log.info(line)
dst_file.write(unstyle(line))
dst_file.write("\n")
if self.json_output and ireq:
output_structure.append(ireq)
assert isinstance(formatted_req, str)
log.info(formatted_req)
dst_file.write(unstyle(formatted_req))
dst_file.write("\n")
else:
output_structure.append(formatted_req)
finally:
if not self.dry_run or self.json_output:
dst_file.detach()
if self.json_output:
json.dump(output_structure, dst_file, indent=4)
print(json.dumps(output_structure, indent=4))
if not self.dry_run:
dst_file.detach()

def _format_requirement(
self,
ireq: InstallRequirement,
marker: Marker | None = None,
hashes: dict[InstallRequirement, set[str]] | None = None,
unsafe: bool = False,
) -> tuple[str, dict[str, str | list[str]]]:
) -> str | dict[str, str | list[str]]:
"""Format a given ``InstallRequirement``.
:returns: A line or a JSON structure to be written to the output file.
"""
ireq_hashes = (hashes if hashes is not None else {}).get(ireq)

line = format_requirement(ireq, marker=marker, hashes=ireq_hashes)
Expand Down Expand Up @@ -326,36 +336,40 @@ def _format_requirement(
if self.annotate:
line = "\n".join(ln.rstrip() for ln in lines)

hashable = True
if ireq.link:
if ireq.link.is_vcs or (ireq.link.is_file and ireq.link.is_existing_dir()):
hashable = False
output_marker = ""
if marker:
output_marker = str(marker)
via = []
for parent_req in required_by:
if parent_req.startswith("-r "):
# Ensure paths to requirements files given are absolute
reqs_in_path = os.path.abspath(parent_req[len("-r ") :])
via.append(f"-r {reqs_in_path}")
else:
via.append(parent_req)
output_hashes = []
if ireq_hashes:
output_hashes = list(ireq_hashes)

ireq_json = {
"name": ireq.name,
"version": str(ireq.specifier).lstrip("=="),
"requirement": str(ireq.req),
"via": via,
"line": unstyle(line),
"hashable": hashable,
"editable": ireq.editable,
"hashes": output_hashes,
"marker": output_marker,
"unsafe": unsafe,
}
if self.json_output:
hashable = True
if ireq.link:
if ireq.link.is_vcs or (
ireq.link.is_file and ireq.link.is_existing_dir()
):
hashable = False
output_marker = ""
if marker:
output_marker = str(marker)
via = []
for parent_req in required_by:
if parent_req.startswith("-r "):
# Ensure paths to requirements files given are absolute
reqs_in_path = os.path.abspath(parent_req[len("-r ") :])
via.append(f"-r {reqs_in_path}")
else:
via.append(parent_req)
output_hashes = []
if ireq_hashes:
output_hashes = list(ireq_hashes)

ireq_json = {
"name": ireq.name,
"version": str(ireq.specifier).lstrip("=="),
"requirement": str(ireq.req),
"via": via,
"line": unstyle(line),
"hashable": hashable,
"editable": ireq.editable,
"hashes": output_hashes,
"marker": output_marker,
"unsafe": unsafe,
}
return ireq_json

return line, ireq_json
return line

0 comments on commit 69c0cb9

Please sign in to comment.