-
Notifications
You must be signed in to change notification settings - Fork 19
/
CEP.py
62 lines (56 loc) · 2.89 KB
/
CEP.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
"""
This file contains the main class of the project. It processes streams of events and detects pattern matches
by invoking the rest of the system components.
"""
from base.DataFormatter import DataFormatter
from parallel.EvaluationManagerFactory import EvaluationManagerFactory
from parallel.ParallelExecutionParameters import ParallelExecutionParameters
from stream.Stream import InputStream, OutputStream
from base.Pattern import Pattern
from evaluation.EvaluationMechanismFactory import EvaluationMechanismParameters
from typing import List
from datetime import datetime
from transformation.PatternPreprocessingParameters import PatternPreprocessingParameters
from transformation.PatternPreprocessor import PatternPreprocessor
class CEP:
"""
A CEP object wraps the engine responsible for actual processing. It accepts the desired workload (list of patterns
to be evaluated) and a set of settings defining the evaluation mechanism to be used and the way the workload should
be optimized and parallelized.
"""
def __init__(self, patterns: Pattern or List[Pattern], eval_mechanism_params: EvaluationMechanismParameters = None,
parallel_execution_params: ParallelExecutionParameters = None,
pattern_preprocessing_params: PatternPreprocessingParameters = None):
"""
Constructor of the class.
"""
actual_patterns = PatternPreprocessor(pattern_preprocessing_params).transform_patterns(patterns)
self.__evaluation_manager = EvaluationManagerFactory.create_evaluation_manager(actual_patterns,
eval_mechanism_params,
parallel_execution_params)
def run(self, events: InputStream, matches: OutputStream, data_formatter: DataFormatter):
"""
Applies the evaluation mechanism to detect the predefined patterns in a given stream of events.
Returns the total time elapsed during evaluation.
"""
start = datetime.now()
self.__evaluation_manager.eval(events, matches, data_formatter)
return (datetime.now() - start).total_seconds()
def get_pattern_match(self):
"""
Returns one match from the output stream.
"""
try:
return self.get_pattern_match_stream().get_item()
except StopIteration: # the stream might be closed.
return None
def get_pattern_match_stream(self):
"""
Returns the output stream containing the detected matches.
"""
return self.__evaluation_manager.get_pattern_match_stream()
def get_evaluation_mechanism_structure_summary(self):
"""
Returns an object summarizing the structure of the underlying evaluation mechanism.
"""
return self.__evaluation_manager.get_structure_summary()