Skip to content

Commit

Permalink
SimultaneousEvent: Add concatenation methods
Browse files Browse the repository at this point in the history
Before this patch it was very difficult to concatenate two
SimultaneousEvent along their time axis. It was only easily possible to
extend a SimultaneousEvent with another SimultaneousEvent by adding more
simultaneous layers via the '.extend' method.

With this patch it becomes easy to extend each child of a
SimultaneousEvent with the children of a similar second
SimultaneousEvent. The new methods 'concatenate_by_index' and
'concatenate_by_tag' find pairs of events in the original and in new
SimultaneousEvent according to their common tag or to their common
index and concatenates the pairs. The concatenation is mostly safe
to use, because it ensures that even slightly different SimultaneousEvent
(e.g. one of them has more children than the other) still get concatenated
in meaningful ways.
  • Loading branch information
levinericzimmermann committed Nov 30, 2022
1 parent ad282e0 commit cf678e2
Show file tree
Hide file tree
Showing 3 changed files with 217 additions and 1 deletion.
97 changes: 96 additions & 1 deletion mutwo/core_events/basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -710,7 +710,7 @@ def extend_until(
typing.Callable[[core_parameters.abc.Duration], core_events.abc.Event]
] = None,
prolong_simple_event: bool = True,
) -> SequentialEvent:
) -> SequentialEvent[T]:
duration = core_events.configurations.UNKNOWN_OBJECT_TO_DURATION(duration)
duration_to_white_space = (
duration_to_white_space
Expand All @@ -723,6 +723,27 @@ def extend_until(
class SimultaneousEvent(core_events.abc.ComplexEvent, typing.Generic[T]):
"""Event-Object which contains other Event-Objects which happen at the same time."""

# ###################################################################### #
# private static methods #
# ###################################################################### #

@staticmethod
def _extend_ancestor(
ancestor: core_events.abc.Event, event: core_events.abc.Event
):
match ancestor:
case core_events.SequentialEvent():
ancestor.extend(event)
case core_events.SimultaneousEvent():
try:
ancestor.concatenate_by_tag(event)
except core_utilities.NoTagError:
ancestor.concatenate_by_index(event)
# We can't concatenate to a simple event.
# We also can't concatenate to anything else.
case _:
raise core_utilities.ConcatenationError(ancestor, event)

# ###################################################################### #
# properties #
# ###################################################################### #
Expand Down Expand Up @@ -839,6 +860,80 @@ def extend_until(
else:
raise core_utilities.ImpossibleToExtendUntilError(event)

@core_utilities.add_copy_option
def concatenate_by_index(self, other: SimultaneousEvent) -> SimultaneousEvent:
"""Concatenate with other :class:`~mutwo.core_events.SimultaneousEvent` along their indices.
:param other: The other `SimultaneousEvent` with which to concatenate.
The other `SimultaneousEvent` can contain more or less events.
:type other: SimultaneousEvent
:param mutate: If ``False`` the function will return a copy of the given object.
If set to ``True`` the object itself will be changed and the function will
return the changed object. Default to ``True``.
:type mutate: bool
:raises core_utilities.ConcatenationError: If there are any :class:`SimpleEvent`
inside a :class:`SimultaneousEvent`.
**Example:**
>>> from mutwo import core_events
>>> s = core_events.SimultaneousEvent(
>>> [core_events.SequentialEvent([core_events.SimpleEvent(1)])]
>>> )
>>> s.concatenate_by_index(s)
SimultaneousEvent([SequentialEvent([SimpleEvent(duration = DirectDuration(duration = 1)), SimpleEvent(duration = DirectDuration(duration = 1))])])
"""
self_duration = self.duration
self.extend_until(self_duration)
for index, event in enumerate(other.copy()):
try:
ancestor = self[index]
except IndexError:
event.slide_in(0, core_events.SimpleEvent(self_duration))
self.append(event)
else:
self._extend_ancestor(ancestor, event)

@core_utilities.add_copy_option
def concatenate_by_tag(self, other: SimultaneousEvent) -> SimultaneousEvent:
"""Concatenate with other :class:`~mutwo.core_events.SimultaneousEvent` along their tags.
:param other: The other `SimultaneousEvent` with which to concatenate.
The other `SimultaneousEvent` can contain more or less events.
:type other: SimultaneousEvent
:param mutate: If ``False`` the function will return a copy of the given object.
If set to ``True`` the object itself will be changed and the function will
return the changed object. Default to ``True``.
:type mutate: bool
:return: Concatenated event.
:raises core_utilities.NoTagError: If any child event doesn't have a 'tag'
attribute.
:raises core_utilities.ConcatenationError: If there are any :class:`SimpleEvent`
inside a :class:`SimultaneousEvent`.
**Example:**
>>> from mutwo import core_events
>>> s = core_events.SimultaneousEvent(
>>> [core_events.TaggedSequentialEvent([core_events.SimpleEvent(1)], tag="test")]
>>> )
>>> s.concatenate_by_tag(s)
SimultaneousEvent([TaggedSequentialEvent([SimpleEvent(duration = DirectDuration(duration = 1)), SimpleEvent(duration = DirectDuration(duration = 1))])])
"""
self_duration = self.duration
self.extend_until(self_duration)
for tagged_event in other.copy():
if not hasattr(tagged_event, "tag"):
raise core_utilities.NoTagError(tagged_event)
tag = tagged_event.tag
try:
ancestor = self[tag]
except KeyError:
tagged_event.slide_in(0, core_events.SimpleEvent(self_duration))
self.append(tagged_event)
else:
self._extend_ancestor(ancestor, tagged_event)


@core_utilities.add_tag_to_class
class TaggedSimpleEvent(SimpleEvent):
Expand Down
25 changes: 25 additions & 0 deletions mutwo/core_utilities/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
"NoSolutionFoundError",
"EmptyEnvelopeError",
"UndefinedReferenceWarning",
"ConcatenationError",
"NoTagError",
)


Expand Down Expand Up @@ -136,3 +138,26 @@ def __init__(self, tempo_point: typing.Any):
"doesn't know attribute 'reference'."
" Therefore reference has been set to 1."
)


class ConcatenationError(TypeError):
def __init__(self, ancestor, event):
super().__init__(
f"Can't concatenate event '{event}' to event '{ancestor}' "
f"of type '{type(ancestor)}'. It is only possible to"
" concatenate a new event to events which are instances of "
"SequentialEvent or SimultaneousEvent. To fix this bug you can"
f" put your event '{ancestor}' inside a SequentialEvent or"
" a SimultaneousEvent."
)


class NoTagError(Exception):
def __init__(self, event_without_tag):
super().__init__(
"It's not possible to concatenate an event "
"with the 'concatenate_by_tag' method if not "
"all child events have tags. Here 'mutwo' detected the "
f"child event '{str(event_without_tag)[:50]}...' "
"which doesn't have any 'tag' attribute."
)
96 changes: 96 additions & 0 deletions tests/events/basic_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1061,6 +1061,102 @@ def test_extend_until(self):
# Check default value for SimultaneousEvent
self.assertEqual(si([s(1), s(3)]).extend_until(), si([s(3), s(3)]))

def test_concatenate_by_index(self):
s, se, si = (
core_events.SimpleEvent,
core_events.SequentialEvent,
core_events.SimultaneousEvent,
)

# Equal size concatenation
self.assertEqual(
self.nested_sequence.concatenate_by_index(
self.nested_sequence, mutate=False
),
si(
[
se([s(1), s(2), s(3), s(1), s(2), s(3)]),
se([s(1), s(2), s(3), s(1), s(2), s(3)]),
]
),
)

# Smaller self
si_test = si([se([s(1), s(1)])])
si_ok = si(
[
se([s(1), s(1), s(1), s(2), s(3)]),
se([s(2), s(1), s(2), s(3)]),
]
)
self.assertEqual(si_test.concatenate_by_index(self.nested_sequence), si_ok)
# Mutate inplace!
self.assertEqual(si_test, si_ok)

# Smaller other
si_test = si([se([s(1), s(1)]), se([s(0.5)]), se([s(2)])])
si_ok = si(
[
se([s(1), s(1), s(1), s(2), s(3)]),
se([s(0.5), s(1.5), s(1), s(2), s(3)]),
se([s(2)]),
]
)
self.assertEqual(si_test.concatenate_by_index(self.nested_sequence), si_ok)

def test_concatenate_by_index_exception(self):
self.assertRaises(
core_utilities.ConcatenationError,
self.sequence.concatenate_by_index,
self.sequence,
)

def test_concatenate_by_tag(self):
s, tse, si = (
core_events.SimpleEvent,
core_events.TaggedSequentialEvent,
core_events.SimultaneousEvent,
)

s1 = si([tse([s(1), s(1)], tag="a")])
s2 = si([tse([s(2), s(1)], tag="a"), tse([s(0.5)], tag="b")])

# Equal size concatenation
self.assertEqual(
s1.concatenate_by_tag(s1, mutate=False),
si([tse([s(1), s(1), s(1), s(1)], tag="a")]),
)

# Smaller self
s2.reverse() # verify order doesn't matter
self.assertEqual(
s1.concatenate_by_tag(s2, mutate=False),
si([tse([s(1), s(1), s(2), s(1)], tag="a"), tse([s(2), s(0.5)], tag="b")]),
)

# Smaller other
s2.reverse() # reverse to original order
self.assertEqual(
s2.concatenate_by_tag(s1, mutate=False),
si(
[tse([s(2), s(1), s(1), s(1)], tag="a"), tse([s(0.5), s(2.5)], tag="b")]
),
)

def test_concatenate_by_tag_exception(self):
self.assertRaises(
core_utilities.NoTagError,
self.sequence.concatenate_by_tag,
self.sequence,
)

s1 = core_events.SimultaneousEvent([core_events.TaggedSimpleEvent(1, tag="a")])
self.assertRaises(
core_utilities.ConcatenationError,
s1.concatenate_by_tag,
s1,
)


if __name__ == "__main__":
unittest.main()

0 comments on commit cf678e2

Please sign in to comment.