Skip to content

Commit

Permalink
1. add option parallel for Data and similar classes
Browse files Browse the repository at this point in the history
  • Loading branch information
VanyaBelyaev committed Aug 24, 2022
1 parent 35b1c40 commit 7a6ae97
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 40 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ include(CTest)
set(OSTAP_VERSION_MAJOR 1)
set(OSTAP_VERSION_MINOR 9)
set(OSTAP_VERSION_PATCH 2)
set(OSTAP_VERSION_TWEAK 1)
set(OSTAP_VERSION_TWEAK 2)

set(OSTAP_VERSION ${OSTAP_VERSION_MAJOR}.${OSTAP_VERSION_MINOR}.${OSTAP_VERSION_PATCH}.${OSTAP_VERSION_TWEAK})

Expand Down
3 changes: 3 additions & 0 deletions ReleaseNotes/release_notes.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
# v1.9.2.2

## New features:
1. add option `parallel` for `Data` and similar classes

## Backward incompatible:

Expand Down
9 changes: 9 additions & 0 deletions ReleaseNotes/v1.9.2.2.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# v1.9.2.2

## New features:

1. add option `parallel` for `Data` and similar classes

## Backward incompatible:

## Bug fixes:
6 changes: 4 additions & 2 deletions ostap/contribs/lhcb/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ def __init__( self ,
lumi_chain = 'GetIntegratedLuminosity/LumiTuple' ,
maxfiles = 100000 ,
check = True ,
silent = False ) :
silent = False ,
parallel = False ) :

chain = chain if isinstance ( chain , str ) else chain.name
lumi_chain = lumi_chain if isinstance ( lumi_chain , str ) else lumi_chain.name
Expand All @@ -99,7 +100,8 @@ def __init__( self ,
description = description ,
maxfiles = maxfiles ,
check = check ,
silent = silent )
silent = silent ,
parallel = parallel )

# =========================================================================
@property
Expand Down
165 changes: 128 additions & 37 deletions ostap/trees/data_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,26 @@ def __call__ ( self , items ) :
data = self.data.clone()
data.add_files ( items )
return data

from ostap.parallel.task import Task

# =============================================================================
## @class DataTask
class DataTask(Task) :
def __init__ ( self , data ) :
self.__data = data.clone ( files = [] )
def initialize_local ( self ) : pass
def initialize_remote ( self , jobid = -1 ) : pass
## actgual processing
def process ( self , jobid , files ) :
"""Actual processing"""
for f in files : self.__data.treatFile ( f )
return self.__data
## get the results
def results ( self ) : return self.__data
## merge the results
def merge_results ( self , results , jobid = -1 ) :
self.__data = self.__data + results

# =============================================================================
## @class Files
Expand All @@ -105,23 +125,26 @@ class Files(object):
>>> data = Files( '*.root' )
>>> files = data.files
"""
def __init__( self ,
files ,
description = "" ,
maxfiles = -1 ,
silent = False ) :
def __init__( self ,
files ,
description = "" ,
maxfiles = -1 ,
silent = False ,
parallel = False ) :
#
if isinstance ( files , str ) : files = [ files ]
elif isinstance ( files , Files ) : files = files.files
#
#

self.__description = description
self.__silent = silent

from copy import deepcopy
self.__patterns = tuple ( sorted ( set ( files ) ) )

assert isinstance ( maxfiles , int ) , "Invalid type for 'maxfiles'!"

self.__maxfiles = maxfiles
self.__files = []

# =====================================================================
Expand All @@ -133,17 +156,42 @@ def __init__( self ,
if not self.silent :
logger.info ('Loading: %s #patterns/files: %s/%d' % ( self.description ,
len(self.patterns) ,
len( _files ) ) )
self.add_files ( _files , maxfiles )

if not self.silent :
logger.info ('Loaded: %s' % self )
len( _files ) ) )

## can use parallel processing here
nfiles = len ( _files )

chunk_size = min ( 20 , nfiles // 3 )
if parallel and chunk_size < nfiles and ( maxfiles < 0 or nfiles <= maxfiles ) :

jobs = []
from ostap.utils.utils import chunked
for chunk in chunked ( _files , chunk_size ) : jobs.append ( chunk )

psilent = self.silent
self.silent = True

task = DataTask ( self )
from ostap.parallel.parallel import WorkManager
wmgr = WorkManager ( silent = True , progress = not self.silent )

wmgr.process ( task , jobs )

results = task.results()
self += results
self.silent = psilent

else :

self.__add_files ( _files , maxfiles )

if not self.silent : logger.info ('Loaded: %s' % self )

@property
def files ( self ) :
"""``files'' : the list of files"""
return tuple ( self.__files )

@property
def description ( self ) :
"""``description'': description of this collection"""
Expand Down Expand Up @@ -172,7 +220,12 @@ def verbose ( self ) :
@verbose.setter
def verbose ( self , value ) :
self.__silent = False if value else True


@property
def maxfiles ( self ) :
"""`maxfiles' : maximal number of files to collect"""
return self.__maxfiles

# =========================================================================
## check if the file is a part of collection
# @code
Expand Down Expand Up @@ -212,7 +265,7 @@ def the_files ( self ) :

# =========================================================================
## add files
def add_files ( self , files , max_files = -1 ) :
def __add_files ( self , files , max_files = -1 ) :
""" Add files/patterns to data collector
"""

Expand All @@ -236,25 +289,24 @@ def add_files ( self , files , max_files = -1 ) :

## the specific action for each file
def treatFile ( self, the_file ) :
if not the_file in self.__files : self.__files.append ( the_file )

if not the_file in self.__files :
self.__files = tuple ( self.__files ) + ( the_file , )

# ===============================================================================
## clone it!
def clone ( self ,
files = None ,
description = None ,
patterns = None ) :
description = None ) :
""" Clone the object
"""
import copy
result = copy.copy ( self )
if not files is None :
if isinstance ( files , str ) : files = files ,
result.__files = tuple ( files )
result.__files = [ f for f in files ]
result.__patterns = ()
if not description is None :
result.__descrfiption = str ( description )
if not patterns is None :
result.__patterns = tuple ( patterns )
result.descrpiption = str ( description )

return result

Expand Down Expand Up @@ -295,7 +347,7 @@ def __or__ ( self , other ) :
files = self.files + tuple ( f for f in other.files if not f in self.files )
description = "|".join ( "(%s)" for s in ( self.description , other.description ) )

return self.clone ( files = files , description = description , patterns = () )
return self.clone ( files = files , description = description )

## get an intersection of two datasets
def __and__ ( self , other ) :
Expand All @@ -310,9 +362,9 @@ def __and__ ( self , other ) :
files = tuple ( f for f in self.files if f in other.files )
description = "&".join ( "(%s)" for s in ( self.description , other.description ) )

return self.clone ( files = files , description = description , patterns = () )
return self.clone ( files = files , description = description )

## get an exclusive OR for two datasets
## get an exclusive OR for two datasets
def __xor__ ( self , other ) :
""" get an exclusive OR for two sets
>>> ds1 = ...
Expand All @@ -327,12 +379,31 @@ def __xor__ ( self , other ) :

description = "^".join ( "(%s)" for s in ( self.description , other.description ) )

return self.clone ( files = files , description = description , patterns = () )

return self.clone ( files = files , description = description )

__add__ = __or__
__mul__ = __and__


## append with another dataset
def __ior__ ( self , other ) :
""" Append with another dataset
>>> ds1 = ...
>>> ds2 = ...
>>> ds |= ds2
>>> ds += ds2 ## ditto
"""
if not self.check_ops ( other ) : return NotImplemented

files = self.files + tuple ( f for f in other.files if not f in self.files )

self.__files = list ( files )
self.__description = "|".join ( "(%s)" for s in ( self.description , other.description ) )

return self

__iadd__ = __ior__


## get union of two datasets
def union ( self , other ) :
Expand Down Expand Up @@ -366,14 +437,30 @@ def __sub__ ( self , other ) :
files = tuple ( f for f in self.files if not f in other.files )
description = "-".join ( "(%s)" for s in ( self.description , other.description ) )

return self.clone ( files = files , description = description , patterns = () )
return self.clone ( files = files , description = description )

## remove the files from another dataset
def __isub__ ( self , other ) :
""" Remove file forom anothere dataset
>>> ds1 = ...
>>> ds2 = ...
>>> ds1 -= ds2 ## get subtraction
"""
if not isinstance ( other , Files ) : return NotImplemented

files = tuple ( f for f in self.files if not f in other.files )

self.__files = list ( files )
self.__description = "-".join ( "(%s)" for s in ( self.description , other.description ) )

return self


# =========================================================================
## get a sample of at most n-elements (if n is integer and >=1 ) or n-fraction
def sample_files ( self , n , sort ) :
"""get a sample of at most n-elements (if n is integer and >=1 ) or n-fraction
"""

if isinstance ( n , int ) and 1 <= n <= len ( self.files ) :
files = random.sample ( self.files , n )
if sort : files.sort()
Expand Down Expand Up @@ -422,7 +509,7 @@ def __getitem__ ( self , item ) :
else :
description = "%s: %s" % ( item , self.description )

return self.clone ( files = files , description = description , patterns = () )
return self.clone ( files = files , description = description )

## printout
def __str__(self):
Expand Down Expand Up @@ -583,7 +670,7 @@ def copy_files ( self , new_dir , parallel = False ) :
copied.append ( result )

copied = tuple ( copied )
return self.clone ( files = copied , patterns = () )
return self.clone ( files = copied )

# =============================================================================
## @class Data
Expand All @@ -603,7 +690,8 @@ def __init__( self ,
description = '' ,
maxfiles = -1 ,
check = True ,
silent = False ) :
silent = False ,
parallel = False ) :

## we will need Ostap machinery for trees&chains here
import ostap.trees.trees
Expand All @@ -623,7 +711,7 @@ def __init__( self ,
if not description : description = "ROOT.TChain(%s)" % self.chain_name

## initialize the base class
Files.__init__( self , files , description , maxfiles , silent = silent )
Files.__init__( self , files , description , maxfiles , silent = silent , parallel = parallel )

@property
def validate ( self ) :
Expand Down Expand Up @@ -686,7 +774,7 @@ def treatFile ( self, the_file ) :
## suppress Warning/Error messages from ROOT
with rootError() :

## new temporary chani/tree
## new temporary chain/tree for this file
tree = ROOT.TChain ( self.chain_name )
tree.Add ( the_file )

Expand Down Expand Up @@ -760,7 +848,8 @@ def __init__( self ,
description = '' ,
maxfiles = -1 ,
check = True ,
silent = False ) :
silent = False ,
parallel = False ) :

## decorate files
if isinstance ( files , str ) : files = [ files ]
Expand All @@ -777,12 +866,14 @@ def __init__( self ,
description = "%s&%s" % ( description , self.chain2.GetName() )

Data.__init__( self ,
chain = chain1 ,
chain = chain1 ,
files = files ,
description = description ,
maxfiles = maxfiles ,
check = check ,
silent = silent )
silent = silent ,
parallel = parallel )


@property
def files1 ( self ) :
Expand Down

0 comments on commit 7a6ae97

Please sign in to comment.