Skip to content

Commit

Permalink
Add support for concurrent processing of pages.
Browse files Browse the repository at this point in the history
setup.py
    Add new _apply_pages.py to wheels/installs.
src/__init__.py
    New top-level apply_pages(). Also convenience fn get_text() which uses
    apply_pages().
    Unlike Python's `multiprocessing` module, we also support passing keyword
    args to functions in worker process.
src/_apply_pages.py
    New, contains implementation of apply_pages().
tests/test_pylint.py
    Avoid pylint failure by disabling `R0801: Similar lines in 2 files`.
tests/test_textextract.py
    Test get_text() and show timings.
src/fitz___init__.py
tests/conftest.py
    Use functions to manipulate _g_log_items so that things work
    even when using `fitz` alias.

Timings for 8-core MacOS-arm64 and PDF spec:
    method='multiprocessing' : 3.3x.
    method='fork': 3.6x.
  • Loading branch information
julian-smith-artifex-com committed May 22, 2024
1 parent 838d8f1 commit a799bde
Show file tree
Hide file tree
Showing 7 changed files with 525 additions and 9 deletions.
11 changes: 8 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,21 +606,26 @@ def add( ret, from_, to_):

if path_so_leaf_b:
# Add rebased implementation files.
add( ret_p, f'{g_root}/src/fitz___init__.py', 'fitz/__init__.py') # For `fitz` module alias.
add( ret_p, f'{g_root}/src/fitz_table.py', 'fitz/table.py') # For `fitz` module alias.
add( ret_p, f'{g_root}/src/fitz_utils.py', 'fitz/utils.py') # For `fitz` module alias.
to_dir = 'pymupdf/'
add( ret_p, f'{g_root}/src/__init__.py', to_dir)
add( ret_p, f'{g_root}/src/__main__.py', to_dir)
add( ret_p, f'{g_root}/src/pymupdf.py', to_dir)
add( ret_p, f'{g_root}/src/table.py', to_dir)
add( ret_p, f'{g_root}/src/utils.py', to_dir)
add( ret_p, f'{g_root}/src/_apply_pages.py', to_dir)
add( ret_p, f'{g_root}/src/build/extra.py', to_dir)
add( ret_p, f'{g_root}/src/build/{path_so_leaf_b}', to_dir)

# Add support for `fitz` backwards compatibility.
add( ret_p, f'{g_root}/src/fitz___init__.py', 'fitz/__init__.py')
add( ret_p, f'{g_root}/src/fitz_table.py', 'fitz/table.py')
add( ret_p, f'{g_root}/src/fitz_utils.py', 'fitz/utils.py')

if mupdf_local:
# Add MuPDF Python API.
add( ret_p, f'{mupdf_build_dir}/mupdf.py', to_dir)

# Add MuPDF shared libraries.
if windows:
wp = pipcl.wdev.WindowsPython()
add( ret_p, f'{mupdf_build_dir}/_mupdf.pyd', to_dir)
Expand Down
209 changes: 206 additions & 3 deletions src/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import string
import sys
import tarfile
import time
import typing
import warnings
import weakref
Expand Down Expand Up @@ -61,8 +62,19 @@ def _set_stream(name, default):
_g_out_log = _set_stream('PYMUPDF_LOG', sys.stdout)
_g_out_message = _set_stream('PYMUPDF_MESSAGE', sys.stdout)

# Set to list() if we are in test suite.
_g_log_items = None
_g_log_items = list()
_g_log_items_active = False

def _log_items():
return _g_log_items

def _log_items_active(active):
global _g_log_items_active
_g_log_items_active = active

def _log_items_clear():
del _g_log_items[:]


def log( text='', caller=1):
'''
Expand All @@ -73,7 +85,7 @@ def log( text='', caller=1):
line = frame_record.lineno
function = frame_record.function
text = f'{filename}:{line}:{function}: {text}'
if _g_log_items is not None:
if _g_log_items_active:
_g_log_items.append(text)
print(text, file=_g_out_log)
_g_out_log.flush()
Expand Down Expand Up @@ -20942,6 +20954,197 @@ def vdist(dir, a, b):
return mupdf.fz_abs(dx * dir.y + dy * dir.x)


def apply_pages(
path,
pagefn,
*,
pagefn_args=(),
pagefn_kwargs=dict(),
initfn=None,
initfn_args=(),
initfn_kwargs=dict(),
pages=None,
method='single',
concurrency=None,
_stats=False,
):
'''
Returns list of results from `pagefn()`, optionally using concurrency for
speed.

Args:
path:
Path of document.
pagefn:
Function to call for each page; is passed (page, *pagefn_args,
**pagefn_kwargs). Return value is added to list that we return. If
`method` is not 'single', must be a top-level function - nested
functions don't work with concurrency.
pagefn_args
pagefn_kwargs:
Additional args to pass to `pagefn`. Must be picklable.
initfn:
If true, called once in each worker process; is passed
(*initfn_args, **initfn_kwargs).
initfn_args
initfn_kwargs:
Args to pass to initfn. Must be picklable.
pages:
List of page numbers to process, or None to include all pages.
method:
'single'
Do not use concurrency.
'mp'
Operate concurrently using Python's `multiprocessing` module.
'fork'
Operate concurrently using custom implementation with
`os.fork()`. Does not work on Windows.
concurrency:
Number of worker processes to use when operating concurrently. If
None, we use the number of available CPUs.
_stats:
Internal, may change or be removed. If true, we output simple
timing diagnostics.

Note: We require a file path rather than a Document, because Document
instances do not work properly after a fork - internal file descriptor
offsets are shared between the parent and child processes.
'''
if _stats:
t0 = time.time()

if method == 'single':
if initfn:
initfn(*initfn_args, **initfn_kwargs)
ret = list()
document = Document(path)
for page in document:
r = pagefn(page, *pagefn_args, **initfn_kwargs)
ret.append(r)

else:
# Use concurrency.
#
from . import _apply_pages

if pages is None:
if _stats:
t = time.time()
with Document(path) as document:
num_pages = len(document)
pages = list(range(num_pages))
if _stats:
t = time.time() - t
log(f'{t:.2f}s: count pages.')

if _stats:
t = time.time()

if method == 'mp':
ret = _apply_pages._multiprocessing(
path,
pages,
pagefn,
pagefn_args,
pagefn_kwargs,
initfn,
initfn_args,
initfn_kwargs,
concurrency,
_stats,
)

elif method == 'fork':
ret = _apply_pages._fork(
path,
pages,
pagefn,
pagefn_args,
pagefn_kwargs,
initfn,
initfn_args,
initfn_kwargs,
concurrency,
_stats,
)

else:
assert 0, f'Unrecognised {method=}.'

if _stats:
t = time.time() - t
log(f'{t:.2f}s: work.')

if _stats:
t = time.time() - t0
log(f'{t:.2f}s: total.')
return ret


def get_text(
path,
*,
pages=None,
method='single',
concurrency=None,

option='text',
clip=None,
flags=None,
textpage=None,
sort=False,
delimiters=None,

_stats=False,
):
'''
Returns list of results from `Page.get_text()`, optionally using
concurrency for speed.

Args:
path:
Path of document.
pages:
List of page numbers to process, or None to include all pages.
method:
'single'
Do not use concurrency.
'mp'
Operate concurrently using Python's `multiprocessing` module.
'fork'
Operate concurrently using custom implementation with
`os.fork`. Does not work on Windows.
concurrency:
Number of worker processes to use when operating concurrently. If
None, we use the number of available CPUs.
option
clip
flags
textpage
sort
delimiters:
Passed to internal calls to `Page.get_text()`.
'''
args_dict = dict(
option=option,
clip=clip,
flags=flags,
textpage=textpage,
sort=sort,
delimiters=delimiters,
)

return apply_pages(
path,
Page.get_text,
pagefn_kwargs=args_dict,
pages=pages,
method=method,
concurrency=concurrency,
_stats=_stats,
)


class TOOLS:
'''
We use @staticmethod to avoid the need to create an instance of this class.
Expand Down
Loading

0 comments on commit a799bde

Please sign in to comment.