Skip to content

Commit

Permalink
fix: wait to run soffice until there is no other soffice process runn…
Browse files Browse the repository at this point in the history
…ing (#3287)

## Summary

This PR addresses an issue where the code could attempt to run `soffice`
in multiple processes and closes #3284
The fix is to add a wait mechanism when there is another `soffice`
process running in already.

## Diagnosis of issue

- `soffice` can only have one process running when using the command
`soffice` as is.
- on main branch the function `partition.common.convert_office_doc`
simply spawns a subprocess to run `soffice` command to convert a `doc`
or `ppt` file into `docx` or `pptx` format.
- if there are multiple partition calls to process `doc` or `ppt` files
and they all want to spawn `soffice` subprocesses only one will succeed
while other processes will simply fail and return 1 from the subprocess
- in downstream this will lead to errors like `PackageNotFoundError:
Package not found at '/tmp/tmpac6lcu4w/document.docx'`

## solution

While there are
[ways](https://www.reddit.com/r/libreoffice/comments/agk3os/how_to_open_more_than_one_calc_instance_under/)
to circumvent the limit of `soffice` by setting a tmp file as user
installation env, these kind of solutions rely on the internals of
`soffice` and adds maintenance cost to track its changes.

This PR solves this problem by adding a wait mechanism: 
- we first spawning a subprocess to run `soffice` 
- if the `stdout` is empty and we still have wait time budget left the
function first checks if there is another `soffice` running
  * If yes then the function waits for 0.01s before checking again; 
* if no the functions spawns a subprocess to run `soffice` and return to
beginning of this step
* we need to return the the beginning to check if `stdout` is empty
because we could have another collision right after `soffice` becomes
available.

## test

This PR adds two unit tests.
Additionally this can be tested by running partition of `.doc` files
locally with multiprocessing.
  • Loading branch information
badGarnet committed Jun 25, 2024
1 parent a7a53f6 commit c32aeaa
Show file tree
Hide file tree
Showing 17 changed files with 113 additions and 38 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

### Fixes

* **Fix a bug where multiple `soffice` processes could be attempted** Add a wait mechanism in `convert_office_doc` so that the function first checks if another `soffice` is running already: if yes wait till the other process finishes or till the wait timeout before spawning a subprocess to run `soffice`

## 0.14.8

### Enhancements
Expand Down
4 changes: 3 additions & 1 deletion requirements/dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,9 @@ prompt-toolkit==3.0.47
# ipython
# jupyter-console
psutil==6.0.0
# via ipykernel
# via
# -c ./test.txt
# ipykernel
ptyprocess==0.7.0
# via
# pexpect
Expand Down
2 changes: 1 addition & 1 deletion requirements/extra-paddleocr.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ attrdict==2.0.1
# via unstructured-paddleocr
babel==2.15.0
# via flask-babel
bce-python-sdk==0.9.14
bce-python-sdk==0.9.17
# via visualdl
blinker==1.8.2
# via flask
Expand Down
4 changes: 2 additions & 2 deletions requirements/extra-pdf-image.txt
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,15 @@ fsspec==2024.5.0
# -c ././deps/constraints.txt
# huggingface-hub
# torch
google-api-core[grpc]==2.19.0
google-api-core[grpc]==2.19.1
# via google-cloud-vision
google-auth==2.30.0
# via
# google-api-core
# google-cloud-vision
google-cloud-vision==3.7.2
# via -r ./extra-pdf-image.in
googleapis-common-protos==1.63.1
googleapis-common-protos==1.63.2
# via
# google-api-core
# grpcio-status
Expand Down
4 changes: 2 additions & 2 deletions requirements/ingest/chroma.txt
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ fsspec==2024.5.0
# huggingface-hub
google-auth==2.30.0
# via kubernetes
googleapis-common-protos==1.63.1
googleapis-common-protos==1.63.2
# via opentelemetry-exporter-otlp-proto-grpc
grpcio==1.64.1
# via
Expand Down Expand Up @@ -232,7 +232,7 @@ starlette==0.37.2
# via fastapi
sympy==1.12.1
# via onnxruntime
tenacity==8.4.1
tenacity==8.4.2
# via chromadb
tokenizers==0.19.1
# via chromadb
Expand Down
2 changes: 1 addition & 1 deletion requirements/ingest/clarifai.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ clarifai-grpc==10.5.3
# via clarifai
contextlib2==21.6.0
# via schema
googleapis-common-protos==1.63.1
googleapis-common-protos==1.63.2
# via clarifai-grpc
grpcio==1.64.1
# via clarifai-grpc
Expand Down
4 changes: 2 additions & 2 deletions requirements/ingest/embed-aws-bedrock.txt
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ requests==2.32.3
# langchain
# langchain-community
# langsmith
s3transfer==0.10.1
s3transfer==0.10.2
# via boto3
six==1.16.0
# via
Expand All @@ -130,7 +130,7 @@ sqlalchemy==2.0.31
# via
# langchain
# langchain-community
tenacity==8.4.1
tenacity==8.4.2
# via
# langchain
# langchain-community
Expand Down
2 changes: 1 addition & 1 deletion requirements/ingest/embed-huggingface.txt
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ sqlalchemy==2.0.31
# langchain-community
sympy==1.12.1
# via torch
tenacity==8.4.1
tenacity==8.4.2
# via
# langchain
# langchain-community
Expand Down
2 changes: 1 addition & 1 deletion requirements/ingest/embed-openai.txt
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ sqlalchemy==2.0.31
# via
# langchain
# langchain-community
tenacity==8.4.1
tenacity==8.4.2
# via
# langchain
# langchain-community
Expand Down
8 changes: 4 additions & 4 deletions requirements/ingest/embed-vertexai.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ frozenlist==1.4.1
# via
# aiohttp
# aiosignal
google-api-core[grpc]==2.19.0
google-api-core[grpc]==2.19.1
# via
# google-cloud-aiplatform
# google-cloud-bigquery
Expand Down Expand Up @@ -76,12 +76,12 @@ google-resumable-media==2.7.1
# via
# google-cloud-bigquery
# google-cloud-storage
googleapis-common-protos[grpc]==1.63.1
googleapis-common-protos[grpc]==1.63.2
# via
# google-api-core
# grpc-google-iam-v1
# grpcio-status
grpc-google-iam-v1==0.13.0
grpc-google-iam-v1==0.13.1
# via google-cloud-resource-manager
grpcio==1.64.1
# via
Expand Down Expand Up @@ -210,7 +210,7 @@ sqlalchemy==2.0.31
# via
# langchain
# langchain-community
tenacity==8.4.1
tenacity==8.4.2
# via
# langchain
# langchain-community
Expand Down
2 changes: 1 addition & 1 deletion requirements/ingest/embed-voyageai.txt
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ requests==2.32.3
# voyageai
sqlalchemy==2.0.31
# via langchain
tenacity==8.4.1
tenacity==8.4.2
# via
# langchain
# langchain-core
Expand Down
4 changes: 2 additions & 2 deletions requirements/ingest/gcs.txt
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ fsspec==2024.5.0
# gcsfs
gcsfs==2024.5.0
# via -r ./ingest/gcs.in
google-api-core==2.19.0
google-api-core==2.19.1
# via
# google-cloud-core
# google-cloud-storage
Expand All @@ -65,7 +65,7 @@ google-crc32c==1.5.0
# google-resumable-media
google-resumable-media==2.7.1
# via google-cloud-storage
googleapis-common-protos==1.63.1
googleapis-common-protos==1.63.2
# via google-api-core
idna==3.7
# via
Expand Down
4 changes: 2 additions & 2 deletions requirements/ingest/google-drive.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ charset-normalizer==3.3.2
# via
# -c ./ingest/../base.txt
# requests
google-api-core==2.19.0
google-api-core==2.19.1
# via google-api-python-client
google-api-python-client==2.134.0
# via -r ./ingest/google-drive.in
Expand All @@ -26,7 +26,7 @@ google-auth==2.30.0
# google-auth-httplib2
google-auth-httplib2==0.2.0
# via google-api-python-client
googleapis-common-protos==1.63.1
googleapis-common-protos==1.63.2
# via google-api-core
httplib2==0.22.0
# via
Expand Down
1 change: 1 addition & 0 deletions requirements/test.in
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ flake8-print
freezegun
label_studio_sdk
mypy
psutil
pydantic
pytest-cov
pytest-mock
Expand Down
4 changes: 3 additions & 1 deletion requirements/test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ mccabe==0.7.0
# via flake8
multidict==6.0.5
# via yarl
mypy==1.10.0
mypy==1.10.1
# via -r ./test.in
mypy-extensions==1.0.0
# via
Expand All @@ -92,6 +92,8 @@ platformdirs==3.10.0
# black
pluggy==1.5.0
# via pytest
psutil==6.0.0
# via -r ./test.in
pycodestyle==2.12.0
# via
# flake8
Expand Down
57 changes: 49 additions & 8 deletions test_unstructured/partition/test_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,18 @@
import os
import pathlib
from dataclasses import dataclass
from multiprocessing import Pool
from unittest import mock

import numpy as np
import pytest
from PIL import Image
from unstructured_inference.inference import layout
from unstructured_inference.inference.elements import TextRegion
from unstructured_inference.inference.layout import DocumentLayout, PageLayout
from unstructured_inference.inference.layoutelement import LayoutElement

from test_unstructured.unit_utils import example_doc_path
from unstructured.documents.coordinates import PixelSpace
from unstructured.documents.elements import (
TYPE_TO_TEXT_ELEMENT_MAP,
Expand Down Expand Up @@ -335,20 +338,58 @@ def test_normalize_layout_element_bulleted_list():
]


class MockPopenWithError:
def __init__(self, *args, **kwargs):
pass
class MockRunOutput:

def communicate(self):
return b"", b"an error occurred"
def __init__(self, returncode, stdout, stderr):
self.returncode = returncode
self.stdout = stdout
self.stderr = stderr


def test_convert_office_doc_captures_errors(monkeypatch, caplog):
import subprocess
from unstructured.partition.common import subprocess

monkeypatch.setattr(subprocess, "Popen", MockPopenWithError)
def mock_run(*args, **kwargs):
return MockRunOutput(1, "an error occurred".encode(), "error details".encode())

monkeypatch.setattr(subprocess, "run", mock_run)
common.convert_office_doc("no-real.docx", "fake-directory", target_format="docx")
assert "an error occurred" in caplog.text
assert "soffice failed to convert to format docx with code 1" in caplog.text


def test_convert_office_docs_avoids_concurrent_call_to_soffice():
paths_to_save = [pathlib.Path(path) for path in ("/tmp/proc1", "/tmp/proc2", "/tmp/proc3")]
for path in paths_to_save:
path.mkdir(exist_ok=True)
(path / "simple.docx").unlink(missing_ok=True)
file_to_convert = example_doc_path("simple.doc")

with Pool(3) as pool:
pool.starmap(common.convert_office_doc, [(file_to_convert, path) for path in paths_to_save])

assert np.sum([(path / "simple.docx").is_file() for path in paths_to_save]) == 3


def test_convert_office_docs_respects_wait_timeout():
paths_to_save = [
pathlib.Path(path) for path in ("/tmp/wait/proc1", "/tmp/wait/proc2", "/tmp/wait/proc3")
]
for path in paths_to_save:
path.mkdir(parents=True, exist_ok=True)
(path / "simple.docx").unlink(missing_ok=True)
file_to_convert = example_doc_path("simple.doc")

with Pool(3) as pool:
pool.starmap(
common.convert_office_doc,
# set timeout to wait for soffice to be available to 0 so only one process can convert
# the doc file on the first try; then the catch all
[(file_to_convert, path, "docx", None, 0) for path in paths_to_save],
)

# because this test file is very small we could have occasions where two files are converted
# when one of the processes spawned just a little
assert np.sum([(path / "simple.docx").is_file() for path in paths_to_save]) < 3


class MockDocxEmptyTable:
Expand Down
45 changes: 36 additions & 9 deletions unstructured/partition/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
from datetime import datetime
from io import BufferedReader, BytesIO, TextIOWrapper
from tempfile import SpooledTemporaryFile
from time import sleep
from typing import IO, TYPE_CHECKING, Any, Optional, TypeVar, cast

import emoji
import psutil

This comment has been minimized.

Copy link
@mgraczyk

mgraczyk Jun 28, 2024

@badGarnet
Looks like psutil needs to be included in the non-dev requirements as well?
This breaks loading the latest release

$ pip install --upgrade unstructured
# ...
$ python
Python 3.12.4 (main, Jun  6 2024, 18:26:44) [Clang 15.0.0 (clang-1500.1.0.2.5)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> from unstructured.partition import common
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/michael/dev/rfp-tool/.venv/lib/python3.12/site-packages/unstructured/partition/common.py", line 13, in <module>
    import psutil
ModuleNotFoundError: No module named 'psutil'
>>>
from tabulate import tabulate

from unstructured.documents.coordinates import CoordinateSystem, PixelSpace
Expand Down Expand Up @@ -365,11 +367,22 @@ def remove_element_metadata(layout_elements) -> list[Element]:
return elements


def _is_soffice_running():
for proc in psutil.process_iter():
try:
if "soffice" in proc.name().lower():
return True
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
return False


def convert_office_doc(
input_filename: str,
output_directory: str,
target_format: str = "docx",
target_filter: Optional[str] = None,
wait_for_soffice_ready_time_out: int = 10,
):
"""Converts a .doc file to a .docx file using the libreoffice CLI.
Expand All @@ -384,6 +397,8 @@ def convert_office_doc(
target_filter: str
The output filter name to use when converting. See references below
for details.
wait_for_soffice_ready_time_out: int
The max wait time in seconds for soffice to become available to run
References
----------
Expand All @@ -407,12 +422,21 @@ def convert_office_doc(
input_filename,
]
try:
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
output, error = process.communicate()
# only one soffice process can be ran
wait_time = 0
sleep_time = 0.1
output = subprocess.run(command, capture_output=True)
message = output.stdout.decode().strip()
# we can't rely on returncode unfortunately because on macOS it would return 0 even when the
# command failed to run; instead we have to rely on the stdout being empty as a sign of the
# process failed
while (wait_time < wait_for_soffice_ready_time_out) and (message == ""):
wait_time += sleep_time
if _is_soffice_running():
sleep(sleep_time)
else:
output = subprocess.run(command, capture_output=True)
message = output.stdout.decode().strip()
except FileNotFoundError:
raise FileNotFoundError(
"""soffice command was not found. Please install libreoffice
Expand All @@ -423,9 +447,12 @@ def convert_office_doc(
- Debian: https://wiki.debian.org/LibreOffice""",
)

logger.info(output.decode().strip())
if error:
logger.error(error.decode().strip())
logger.info(message)
if output.returncode != 0 or message == "":
logger.error(
"soffice failed to convert to format %s with code %i", target_format, output.returncode
)
logger.error(output.stderr.decode().strip())


def exactly_one(**kwargs: Any) -> None:
Expand Down

0 comments on commit c32aeaa

Please sign in to comment.