"""Generic event classes which can be used in multiple contexts.
The different events differ in their timing structure and whether they
are nested or not:
"""
from __future__ import annotations
import bisect
import copy
import functools
import operator
import types
import typing
import ranges
from mutwo import core_constants
from mutwo import core_events
from mutwo import core_parameters
from mutwo import core_utilities
__all__ = (
"SimpleEvent",
"Consecution",
"Concurrence",
"TaggedSimpleEvent",
"TaggedConsecution",
"TaggedConcurrence",
)
[docs]class SimpleEvent(core_events.abc.Event):
"""Event-Object which doesn't contain other Event-Objects (the node or leaf).
:param duration: The duration of the ``SimpleEvent``. Mutwo will convert
the incoming object to a :class:`mutwo.core_parameters.abc.Duration` object
with the global `core_events.configurations.UNKNOWN_OBJECT_TO_DURATION`
callable.
**Example:**
>>> from mutwo import core_events
>>> chronon = core_events.SimpleEvent(2)
>>> print(chronon)
SimpleEvent(duration = DirectDuration(duration = 2))
"""
parameter_to_exclude_from_representation_tuple = ("tempo_envelope",)
def __init__(
self,
duration: core_parameters.abc.Duration,
tempo_envelope: typing.Optional[core_events.TempoEnvelope] = None,
):
super().__init__(tempo_envelope)
self.duration = duration
# ###################################################################### #
# magic methods #
# ###################################################################### #
def __eq__(self, other: typing.Any) -> bool:
"""Test for checking if two objects are equal."""
try:
parameter_to_compare_set = set([])
for object_ in (self, other):
for parameter_to_compare in object_._parameter_to_compare_tuple:
parameter_to_compare_set.add(parameter_to_compare)
except AttributeError:
return False
return core_utilities.test_if_objects_are_equal_by_parameter_tuple(
self, other, tuple(parameter_to_compare_set)
)
def __repr__(self) -> str:
attribute_iterator = (
"{} = {}".format(attribute, getattr(self, attribute))
for attribute in self._parameter_to_print_tuple
)
return "{}({})".format(type(self).__name__, ", ".join(attribute_iterator))
# ###################################################################### #
# private methods #
# ###################################################################### #
@core_utilities.add_copy_option
def _set_parameter(
self,
parameter_name: str,
object_or_function: typing.Callable[
[core_constants.ParameterType], core_constants.ParameterType
]
| core_constants.ParameterType,
set_unassigned_parameter: bool,
id_set: set[int],
) -> SimpleEvent:
old_parameter = self.get_parameter(parameter_name)
if set_unassigned_parameter or old_parameter is not None:
if hasattr(object_or_function, "__call__"):
new_parameter = object_or_function(old_parameter)
else:
new_parameter = object_or_function
setattr(self, parameter_name, new_parameter)
@core_utilities.add_copy_option
def _mutate_parameter(
self,
parameter_name: str,
function: typing.Callable[[core_constants.ParameterType], None] | typing.Any,
id_set: set[int],
) -> SimpleEvent:
parameter = self.get_parameter(parameter_name)
if parameter is not None:
function(parameter)
# ###################################################################### #
# properties #
# ###################################################################### #
@property
def _parameter_to_print_tuple(self) -> tuple[str, ...]:
"""Return tuple of attribute names which shall be printed for repr."""
# Fix infinite circular loop (due to 'tempo_envelope')
# and avoid printing too verbose parameters.
return tuple(
filter(
lambda attribute: attribute
not in self.parameter_to_exclude_from_representation_tuple,
self._parameter_to_compare_tuple,
)
)
@property
def _parameter_to_compare_tuple(self) -> tuple[str, ...]:
"""Return tuple of attribute names which values define the :class:`SimpleEvent`.
The returned attribute names are used for equality check between two
:class:`SimpleEvent` objects.
"""
return tuple(
attribute
for attribute in dir(self)
# We have to use 'and' (lazy evaluation) instead of
# 'all', to avoid redundant checks!
#
# no private attributes
if attribute[0] != "_"
# no redundant comparisons
and attribute not in ("parameter_to_exclude_from_representation_tuple",)
# no methods
and not isinstance(getattr(self, attribute), types.MethodType)
)
@property
def duration(self) -> core_parameters.abc.Duration:
return self._duration
@duration.setter
def duration(self, duration: core_parameters.abc.Duration):
self._duration = core_events.configurations.UNKNOWN_OBJECT_TO_DURATION(duration)
# ###################################################################### #
# public methods #
# ###################################################################### #
[docs] def destructive_copy(self) -> SimpleEvent:
return copy.deepcopy(self)
[docs] def get_parameter(
self, parameter_name: str, flat: bool = False, filter_undefined: bool = False
) -> core_constants.ParameterType:
return getattr(self, parameter_name, None)
# Update docstring
[docs] def set_parameter( # type: ignore
self,
*args,
**kwargs,
) -> SimpleEvent:
"""Sets event parameter to new value.
:param parameter_name: The name of the parameter which values shall be changed.
:param object_or_function: For setting the parameter either a new value can be
passed directly or a function can be passed. The function gets as an
argument the previous value that has had been assigned to the respective
object and has to return a new value that will be assigned to the object.
:param set_unassigned_parameter: If set to ``False`` a new parameter will only
be assigned to an Event if the Event already has a attribute with the
respective `parameter_name`. If the Event doesn't know the attribute yet
and `set_unassigned_parameter` is False, the method call will simply be
ignored.
:param mutate: If ``False`` the function will return a copy of the given object.
If set to ``True`` the object itself will be changed and the function will
return the changed object. Default to ``True``.
**Example:**
>>> from mutwo import core_events
>>> chronon = core_events.SimpleEvent(2)
>>> chronon.set_parameter(
... 'duration', lambda old_duration: old_duration * 2
... )
SimpleEvent(duration = DirectDuration(duration = 4))
>>> chronon.duration
DirectDuration(4)
>>> chronon.set_parameter('duration', 3)
SimpleEvent(duration = DirectDuration(duration = 3))
>>> chronon.duration
DirectDuration(3)
>>> chronon.set_parameter(
... 'unknown_parameter', 10, set_unassigned_parameter=False
... ) # this will be ignored
SimpleEvent(duration = DirectDuration(duration = 3))
>>> chronon.unknown_parameter
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
AttributeError: 'SimpleEvent' object has no attribute 'unknown_parameter'
>>> chronon.set_parameter(
... 'unknown_parameter', 10, set_unassigned_parameter=True
... ) # this will be written
SimpleEvent(duration = DirectDuration(duration = 3), unknown_parameter = 10)
>>> chronon.unknown_parameter
10
"""
return super().set_parameter(*args, **kwargs)
[docs] def metrize(self, mutate: bool = True) -> SimpleEvent:
metrized_event = self._event_to_metrized_event(self)
if mutate:
self.duration = metrized_event.duration
self.tempo_envelope = metrized_event.tempo_envelope
return self
else:
return metrized_event
[docs] @core_utilities.add_copy_option
def cut_out( # type: ignore
self,
start: core_parameters.abc.Duration,
end: core_parameters.abc.Duration,
) -> SimpleEvent:
start, end = (
core_events.configurations.UNKNOWN_OBJECT_TO_DURATION(unknown_object)
for unknown_object in (start, end)
)
self._assert_valid_absolute_time(start)
self._assert_correct_start_and_end_values(
start, end, condition=lambda start, end: start < end
)
duration = self.duration
difference_to_duration: core_parameters.DirectDuration = (
core_parameters.DirectDuration(0)
)
if start > 0:
difference_to_duration += start
if end < duration:
difference_to_duration += duration - end
if difference_to_duration >= duration:
raise core_utilities.InvalidCutOutStartAndEndValuesError(
start, end, self, duration
)
self.duration -= difference_to_duration
[docs] @core_utilities.add_copy_option
def cut_off( # type: ignore
self,
start: core_parameters.abc.Duration,
end: core_parameters.abc.Duration,
) -> SimpleEvent:
start, end = (
core_events.configurations.UNKNOWN_OBJECT_TO_DURATION(unknown_object)
for unknown_object in (start, end)
)
self._assert_valid_absolute_time(start)
self._assert_correct_start_and_end_values(start, end)
duration = self.duration
if start < duration:
if end > duration:
end = duration
self.duration -= end - start
T = typing.TypeVar("T", bound=core_events.abc.Event)
[docs]class Consecution(core_events.abc.ComplexEvent, typing.Generic[T]):
"""Event-Object which contains other Events which happen in a linear order."""
# ###################################################################### #
# magic methods #
# ###################################################################### #
def __add__(self, event: list[T]) -> Consecution[T]:
e = self.copy()
e._concatenate_tempo_envelope(event)
e.extend(event)
return e
# ###################################################################### #
# private static methods #
# ###################################################################### #
@staticmethod
def _get_index_at_from_absolute_time_tuple(
absolute_time: float,
absolute_time_tuple: float,
duration: float,
) -> typing.Optional[int]:
if absolute_time < duration and absolute_time >= 0:
return bisect.bisect_right(absolute_time_tuple, absolute_time) - 1
else:
return None
# ###################################################################### #
# private methods #
# ###################################################################### #
# We need to have a private "_cut_off" method to simplify
# overriding the public "cut_off" method in children classes
# of Consecution. This is necessary, because the implementation
# of "squash_in" makes use of "_cut_off". In this way it is possible
# to adjust the meaning of the public "cut_off" method, without
# having to change the meaning of "squash_in" (this happens for instance
# in the mutwo.core_events.Envelope class).
def _cut_off(
self,
start: core_parameters.abc.Duration,
end: core_parameters.abc.Duration,
cut_off_duration: typing.Optional[core_parameters.abc.Duration] = None,
) -> Consecution[T]:
if cut_off_duration is None:
cut_off_duration = end - start
# Collect core_events which are only active within the
# cut_off - range
event_to_delete_list = []
absolute_time_tuple = self.absolute_time_tuple
for event_index, event_start, event_end, event in zip(
range(len(self)),
absolute_time_tuple,
absolute_time_tuple[1:] + (None,),
self,
):
if event_end is None:
event_end = event_start + event.duration
if event_start >= start and event_end <= end:
event_to_delete_list.append(event_index)
# Shorten event which are partly active within the
# cut_off - range
elif event_start <= start and event_end >= start:
difference_to_event_start = start - event_start
event.cut_off(
difference_to_event_start,
difference_to_event_start + cut_off_duration,
)
elif event_start < end and event_end > end:
difference_to_event_start = event_start - start
event.cut_off(0, cut_off_duration - difference_to_event_start)
for index in reversed(event_to_delete_list):
del self[index]
return self
def _split_child_at(
self,
absolute_time: core_parameters.abc.Duration | typing.Any,
absolute_time_in_floats_tuple: tuple[float, ...],
duration_in_floats: float,
) -> int:
absolute_time = core_events.configurations.UNKNOWN_OBJECT_TO_DURATION(
absolute_time
)
self._assert_valid_absolute_time(absolute_time)
absolute_time_in_floats = absolute_time.duration_in_floats
event_index = Consecution._get_index_at_from_absolute_time_tuple(
absolute_time_in_floats, absolute_time_in_floats_tuple, duration_in_floats
)
# If there is no event at the requested time, raise error
if event_index is None:
raise core_utilities.SplitUnavailableChildError(absolute_time)
# Only try to split child event at the requested time if there isn't
# a segregation already anyway
elif absolute_time_in_floats != absolute_time_in_floats_tuple[event_index]:
try:
end = absolute_time_in_floats_tuple[event_index + 1]
except IndexError:
end = duration_in_floats
difference = end - absolute_time_in_floats
split_event = self[event_index].split_at(difference)
split_event_count = len(split_event)
match split_event_count:
case 1:
pass
case 2:
self[event_index] = split_event[0]
self.insert(event_index, split_event[1])
case _:
raise RuntimeError("Unexpected event count!")
return event_index + 1
return event_index
# ###################################################################### #
# private properties #
# ###################################################################### #
@property
def _absolute_time_tuple_and_duration(
self,
) -> [tuple[core_parameters.abc.Duration, ...], core_parameters.abc.Duration]:
"""Return start time for each event and the end time of the last event.
This property helps to improve performance of various functions
which uses duration and absolute_time_tuple attribute.
"""
duration_iterator = (event.duration for event in self)
absolute_time_tuple = tuple(
core_utilities.accumulate_from_n(
duration_iterator, core_parameters.DirectDuration(0)
)
)
return absolute_time_tuple[:-1], absolute_time_tuple[-1]
@property
def _absolute_time_in_floats_tuple_and_duration(
self,
) -> tuple[tuple[float, ...], float]:
"""Return start time for each event and the end time of the last event.
This property helps to improve performance of various functions
which uses duration and absolute_time_tuple attribute.
"""
duration_iterator = (event.duration.duration_in_floats for event in self)
absolute_time_tuple = tuple(
# We need to round each duration again after accumulation,
# because floats were summed which could lead to
# potential floating point errors again, which will
# lead to bad errors later (for instance in
# core_utilities.scale).
map(
lambda d: core_utilities.round_floats(
d,
core_parameters.configurations.ROUND_DURATION_TO_N_DIGITS,
),
core_utilities.accumulate_from_n(duration_iterator, 0),
)
)
return absolute_time_tuple[:-1], absolute_time_tuple[-1]
# ###################################################################### #
# properties #
# ###################################################################### #
@core_events.abc.ComplexEvent.duration.getter
def duration(self) -> core_parameters.abc.Duration:
try:
return functools.reduce(operator.add, (event.duration for event in self))
# If Consecution is empty
except TypeError:
return core_parameters.DirectDuration(0)
@property
def absolute_time_tuple(self) -> tuple[core_parameters.abc.Duration, ...]:
"""Return start time as :class:`core_parameters.abc.Duration` for each event."""
return self._absolute_time_tuple_and_duration[0]
@property
def absolute_time_in_floats_tuple(self) -> tuple[float, ...]:
"""Return start time as `float` for each event."""
return self._absolute_time_in_floats_tuple_and_duration[0]
@property
def start_and_end_time_per_event(
self,
) -> tuple[ranges.Range, ...]:
"""Return start and end time for each event."""
duration_iterator = (event.duration for event in self)
absolute_time_tuple = tuple(
core_utilities.accumulate_from_n(
duration_iterator, core_parameters.DirectDuration(0)
)
)
return tuple(
ranges.Range(*start_and_end_time)
for start_and_end_time in zip(absolute_time_tuple, absolute_time_tuple[1:])
)
# ###################################################################### #
# public methods #
# ###################################################################### #
[docs] def get_event_index_at(
self, absolute_time: core_parameters.abc.Duration | typing.Any
) -> typing.Optional[int]:
"""Get index of event which is active at the passed absolute_time.
:param absolute_time: The absolute time where the method shall search
for the active event.
:type absolute_time: core_parameters.abc.Duration | typing.Any
:return: Index of event if there is any event at the requested absolute time
and ``None`` if there isn't any event.
**Example:**
>>> from mutwo import core_events
>>> consecution = core_events.Consecution([core_events.SimpleEvent(2), core_events.SimpleEvent(3)])
>>> consecution.get_event_index_at(1)
0
>>> consecution.get_event_index_at(3)
1
>>> consecution.get_event_index_at(100)
**Warning:**
This method ignores events with duration == 0.
"""
absolute_time_in_floats = core_events.configurations.UNKNOWN_OBJECT_TO_DURATION(
absolute_time
).duration_in_floats
(
absolute_time_in_floats_tuple,
duration_in_floats,
) = self._absolute_time_in_floats_tuple_and_duration
return Consecution._get_index_at_from_absolute_time_tuple(
absolute_time_in_floats, absolute_time_in_floats_tuple, duration_in_floats
)
[docs] def get_event_at(
self, absolute_time: core_parameters.abc.Duration | typing.Any
) -> typing.Optional[T]:
"""Get event which is active at the passed absolute_time.
:param absolute_time: The absolute time where the method shall search
for the active event.
:type absolute_time: core_parameters.abc.Duration | typing.Any
:return: Event if there is any event at the requested absolute time
and ``None`` if there isn't any event.
**Example:**
>>> from mutwo import core_events
>>> consecution = core_events.Consecution([core_events.SimpleEvent(2), core_events.SimpleEvent(3)])
>>> consecution.get_event_at(1)
SimpleEvent(duration = DirectDuration(duration = 2))
>>> consecution.get_event_at(3)
SimpleEvent(duration = DirectDuration(duration = 3))
>>> consecution.get_event_at(100)
**Warning:**
This method ignores events with duration == 0.
"""
event_index = self.get_event_index_at(absolute_time)
if event_index is None:
return None
else:
return self[event_index] # type: ignore
[docs] @core_utilities.add_copy_option
def cut_out( # type: ignore
self,
start: core_constants.DurationType,
end: core_constants.DurationType,
) -> Consecution[T]:
start, end = (
core_events.configurations.UNKNOWN_OBJECT_TO_DURATION(unknown_object)
for unknown_object in (start, end)
)
self._assert_valid_absolute_time(start)
self._assert_correct_start_and_end_values(start, end)
event_to_remove_index_list = []
for event_index, event_start, event in zip(
range(len(self)), self.absolute_time_tuple, self
):
event_duration = event.duration
event_end = event_start + event_duration
cut_out_start: core_parameters.DirectDuration = (
core_parameters.DirectDuration(0)
)
cut_out_end = event_duration
if event_start < start:
cut_out_start += start - event_start
if event_end > end:
cut_out_end -= event_end - end
if cut_out_start < cut_out_end:
event.cut_out(cut_out_start, cut_out_end)
elif not (
# Support special case of events with duration = 0.
event.duration == 0
and event_start >= start
and event_start <= end
):
event_to_remove_index_list.append(event_index)
for event_to_remove_index in reversed(event_to_remove_index_list):
del self[event_to_remove_index]
[docs] @core_utilities.add_copy_option
def cut_off( # type: ignore
self,
start: core_constants.DurationType,
end: core_constants.DurationType,
) -> Consecution[T]:
start, end = (
core_events.configurations.UNKNOWN_OBJECT_TO_DURATION(unknown_object)
for unknown_object in (start, end)
)
self._assert_valid_absolute_time(start)
cut_off_duration = end - start
# Avoid unnecessary iterations
if cut_off_duration > 0:
return self._cut_off(start, end, cut_off_duration)
[docs] @core_utilities.add_copy_option
def squash_in( # type: ignore
self,
start: core_parameters.abc.Duration | typing.Any,
event_to_squash_in: core_events.abc.Event,
) -> Consecution[T]:
start = core_events.configurations.UNKNOWN_OBJECT_TO_DURATION(start)
self._assert_valid_absolute_time(start)
start_in_floats = start.duration_in_floats
self._assert_start_in_range(start_in_floats)
# Only run cut_off if necessary -> Improve performance
if (event_to_squash_in_duration := event_to_squash_in.duration) > 0:
cut_off_end = start + event_to_squash_in_duration
self._cut_off(start, cut_off_end, event_to_squash_in_duration)
# We already know that the given start is within the
# range of the event. This means that if the start
# is bigger than the duration, it is only due to a
# floating point rounding error. To avoid odd bugs
# we therefore have to define the bigger-equal
# relationship.
(
absolute_time_in_floats_tuple,
duration_in_floats,
) = self._absolute_time_in_floats_tuple_and_duration
if start_in_floats >= duration_in_floats:
self.append(event_to_squash_in)
else:
try:
insert_index = absolute_time_in_floats_tuple.index(start)
# There is an event on the given point which need to be
# split.
except ValueError:
active_event_index = (
Consecution._get_index_at_from_absolute_time_tuple(
start_in_floats,
absolute_time_in_floats_tuple,
duration_in_floats,
)
)
split_position = (
start_in_floats - absolute_time_in_floats_tuple[active_event_index]
)
if (
split_position > 0
and split_position < self[active_event_index].duration
):
split_active_event = self[active_event_index].split_at(
split_position
)
self[active_event_index] = split_active_event[1]
self.insert(active_event_index, split_active_event[0])
active_event_index += 1
insert_index = active_event_index
self.insert(insert_index, event_to_squash_in)
[docs] @core_utilities.add_copy_option
def slide_in(
self,
start: core_parameters.abc.Duration,
event_to_slide_in: core_events.abc.Event,
) -> Consecution[T]:
start = core_events.configurations.UNKNOWN_OBJECT_TO_DURATION(start)
self._assert_valid_absolute_time(start)
start_in_floats = start.duration_in_floats
if start_in_floats == 0:
self.insert(0, event_to_slide_in)
return self
self._assert_start_in_range(start_in_floats)
try:
self[:], b = self.split_at(start)
except ValueError: # Only one event => start == duration.
self.append(event_to_slide_in)
else:
self.extend([event_to_slide_in] + b)
return self
[docs] @core_utilities.add_copy_option
def split_child_at(
self, absolute_time: core_parameters.abc.Duration | typing.Any
) -> Consecution[T]:
(
absolute_time_in_floats_tuple,
duration_in_floats,
) = self._absolute_time_in_floats_tuple_and_duration
return self._split_child_at(
absolute_time, absolute_time_in_floats_tuple, duration_in_floats
)
[docs] def split_at(
self,
*absolute_time: core_parameters.abc.Duration,
ignore_invalid_split_point: bool = False,
) -> tuple[Consecution, ...]:
if not absolute_time:
raise core_utilities.NoSplitTimeError()
(
absolute_time_in_floats_tuple,
duration_in_floats,
) = self._absolute_time_in_floats_tuple_and_duration
absolute_time_list = list(absolute_time_in_floats_tuple)
# NOTE: maybe we can add a 'mutate=False' keyword in case
# someone doesn't care about keeping the old event and wants
# to save some seconds of expensive copy-operation?
c = self.copy()
index_list = []
is_first = True
for t in sorted(absolute_time):
if is_first: # First is smallest, check if t < 0
self._assert_valid_absolute_time(t)
is_first = False
# Improve performance: don't try to split if we know it is
# already split here. We also need to be sure to not
# add any duplicates to 'absolute_time_list', so we need
# to check anyway.
if t in absolute_time_list:
index_list.append(absolute_time_list.index(t))
continue
# It's okay to ignore, this is still within the given event
# (if we don't continue 'split_child_at' raises an error).
if t == duration_in_floats:
continue
try:
i = c._split_child_at(t, tuple(absolute_time_list), duration_in_floats)
except core_utilities.SplitUnavailableChildError:
if not ignore_invalid_split_point:
raise core_utilities.SplitError(t)
# We can stop, because if there isn't any child at this time
# there won't be any child at a later time (remember: our
# absolute times are sorted).
break
index_list.append(i)
absolute_time_list.append(t)
absolute_time_list.sort()
# Add frame indices (if not already present)
if 0 not in index_list:
index_list.insert(0, 0)
if (event_count := len(c)) not in index_list:
index_list.append(event_count)
return tuple(c[i0:i1] for i0, i1 in zip(index_list, index_list[1:]))
[docs] @core_utilities.add_copy_option
def extend_until(
self,
duration: core_parameters.abc.Duration,
duration_to_white_space: typing.Optional[
typing.Callable[[core_parameters.abc.Duration], core_events.abc.Event]
] = None,
prolong_chronon: bool = True,
) -> Consecution[T]:
duration = core_events.configurations.UNKNOWN_OBJECT_TO_DURATION(duration)
duration_to_white_space = (
duration_to_white_space
or core_events.configurations.DEFAULT_DURATION_TO_WHITE_SPACE
)
if (difference := duration - self.duration) > 0:
self.append(duration_to_white_space(difference))
[docs]class Concurrence(core_events.abc.ComplexEvent, typing.Generic[T]):
"""Event-Object which contains other Event-Objects which happen at the same time."""
# ###################################################################### #
# private static methods #
# ###################################################################### #
@staticmethod
def _extend_ancestor(ancestor: core_events.abc.Event, event: core_events.abc.Event):
try:
ancestor._concatenate_tempo_envelope(event)
# We can't concatenate to a chronon.
# We also can't concatenate to anything else.
except AttributeError:
raise core_utilities.ConcatenationError(ancestor, event)
match ancestor:
case core_events.Consecution():
ancestor.extend(event)
case core_events.Concurrence():
try:
ancestor.concatenate_by_tag(event)
except core_utilities.NoTagError:
ancestor.concatenate_by_index(event)
# This should already fail above, but if this strange object
# somehow owned '_concatenate_tempo_envelope', it should
# fail here.
case _:
raise core_utilities.ConcatenationError(ancestor, event)
# ###################################################################### #
# private methods #
# ###################################################################### #
def _make_event_slice_tuple(
self,
absolute_time_list: list[core_parameters.abc.Duration],
slice_tuple_to_event: typing.Callable[
[tuple[core_parameters.abc.Event, ...]], core_parameters.abc.Event
],
) -> tuple[core_events.abc.Event, ...]:
"""Split at given times and cast split events into new events."""
# Slice all child events
slices = []
for e in self:
slices.append(
list(e.split_at(*absolute_time_list, ignore_invalid_split_point=True))
)
# Ensure all slices have the same amount of entries,
# because we use 'zip' later and if one of them is
# shorter we loose some parts of our event.
if slices:
slices_count_tuple = tuple(len(s) for s in slices)
max_slice_count = max(slices_count_tuple)
for s, c in zip(slices, slices_count_tuple):
if delta := max_slice_count - c:
s.extend([None] * delta)
# Finally, build new sequence from event slices
event_list = []
for slice_tuple in zip(*slices):
if slice_tuple := tuple(filter(bool, slice_tuple)):
e = slice_tuple_to_event(slice_tuple)
event_list.append(e)
return tuple(event_list)
# ###################################################################### #
# properties #
# ###################################################################### #
@core_events.abc.ComplexEvent.duration.getter
def duration(self) -> core_constants.DurationType:
try:
return max(event.duration for event in self)
# If Concurrence is empty
except ValueError:
return core_parameters.DirectDuration(0)
# ###################################################################### #
# public methods #
# ###################################################################### #
[docs] @core_utilities.add_copy_option
def cut_out( # type: ignore
self,
start: core_parameters.abc.Duration | typing.Any,
end: core_parameters.abc.Duration | typing.Any,
) -> Concurrence[T]:
start, end = (
core_events.configurations.UNKNOWN_OBJECT_TO_DURATION(unknown_object)
for unknown_object in (start, end)
)
self._assert_valid_absolute_time(start)
self._assert_correct_start_and_end_values(start, end)
[event.cut_out(start, end) for event in self]
[docs] @core_utilities.add_copy_option
def cut_off( # type: ignore
self,
start: core_constants.DurationType,
end: core_constants.DurationType,
) -> Concurrence[T]:
start, end = (
core_events.configurations.UNKNOWN_OBJECT_TO_DURATION(unknown_object)
for unknown_object in (start, end)
)
self._assert_valid_absolute_time(start)
self._assert_correct_start_and_end_values(start, end)
[event.cut_off(start, end) for event in self]
[docs] @core_utilities.add_copy_option
def squash_in( # type: ignore
self,
start: core_parameters.abc.Duration | typing.Any,
event_to_squash_in: core_events.abc.Event,
) -> Concurrence[T]:
start = core_events.configurations.UNKNOWN_OBJECT_TO_DURATION(start)
self._assert_valid_absolute_time(start)
self._assert_start_in_range(start)
for event in self:
try:
event.squash_in(start, event_to_squash_in) # type: ignore
# Simple events don't have a 'squash_in' method.
except AttributeError:
raise core_utilities.ImpossibleToSquashInError(self, event_to_squash_in)
[docs] @core_utilities.add_copy_option
def slide_in(
self,
start: core_parameters.abc.Duration,
event_to_slide_in: core_events.abc.Event,
) -> Concurrence[T]:
start = core_events.configurations.UNKNOWN_OBJECT_TO_DURATION(start)
self._assert_valid_absolute_time(start)
self._assert_start_in_range(start)
for event in self:
try:
event.slide_in(start, event_to_slide_in) # type: ignore
# Simple events don't have a 'slide_in' method.
except AttributeError:
raise core_utilities.ImpossibleToSlideInError(self, event_to_slide_in)
[docs] @core_utilities.add_copy_option
def split_child_at(
self, absolute_time: core_constants.DurationType
) -> Concurrence[T]:
for event_index, event in enumerate(self):
try:
event.split_child_at(absolute_time)
# chronons don't have a 'split_child_at' method
except AttributeError:
split_event = event.split_at(absolute_time)
self[event_index] = Consecution(split_event)
[docs] @core_utilities.add_copy_option
def extend_until(
self,
duration: typing.Optional[core_parameters.abc.Duration] = None,
duration_to_white_space: typing.Optional[
typing.Callable[[core_parameters.abc.Duration], core_events.abc.Event]
] = None,
prolong_chronon: bool = True,
) -> Concurrence[T]:
duration = (
self.duration
if duration is None
else core_events.configurations.UNKNOWN_OBJECT_TO_DURATION(duration)
)
duration_to_white_space = (
duration_to_white_space
or core_events.configurations.DEFAULT_DURATION_TO_WHITE_SPACE
)
# We only append chronons to consecutions, because there
# are many problems with the Concurrence[SimpleEvent] construct
# ('extend_until' and 'squash_in' will fail on such a container).
# Therefore calling 'extend_until' on an empty Concurrence is
# in fact ineffective: The user would get a Concurrence which
# still has duration = 0, which is absolutely unexpected. Therefore
# we raise an error, to avoid confusion by the user.
if not self:
raise core_utilities.IneffectiveExtendUntilError(self)
for event in self:
try:
event.extend_until(
duration, duration_to_white_space, prolong_chronon
)
# SimpleEvent
except AttributeError:
if prolong_chronon:
if (difference := duration - event.duration) > 0:
event.duration += difference
else:
raise core_utilities.ImpossibleToExtendUntilError(event)
[docs] @core_utilities.add_copy_option
def concatenate_by_index(self, other: Concurrence) -> Concurrence:
"""Concatenate with other :class:`~mutwo.core_events.Concurrence` along their indices.
:param other: The other `Concurrence` with which to concatenate.
The other `Concurrence` can contain more or less events.
:type other: Concurrence
:param mutate: If ``False`` the function will return a copy of the given object.
If set to ``True`` the object itself will be changed and the function will
return the changed object. Default to ``True``.
:type mutate: bool
:raises core_utilities.ConcatenationError: If there are any :class:`SimpleEvent`
inside a :class:`Concurrence`.
**Hint:**
Similarly to Pythons ``list.extend`` the concatenation simply appends
the children of the other event to the sequence without copying them.
This means when changing the children in the new event, it also changes
the child event in the original sequence. If you want to avoid this,
call ``event.copy()`` before concatenating it to the host event.
**Example:**
>>> from mutwo import core_events
>>> s = core_events.Concurrence(
... [core_events.Consecution([core_events.SimpleEvent(1)])]
... )
>>> s.concatenate_by_index(s)
Concurrence([Consecution([SimpleEvent(duration = DirectDuration(duration = 1)), SimpleEvent(duration = DirectDuration(duration = 1))])])
"""
if (self_duration := self.duration) > 0:
self.extend_until(self_duration)
for index, event in enumerate(other):
try:
ancestor = self[index]
except IndexError:
if self_duration > 0:
# Shallow copy before 'slide_in': We use the same
# events, but we don't want to change the other sequence.
event_new = event.empty_copy()
event_new.extend(event[:])
event = event_new.slide_in(0, core_events.SimpleEvent(self_duration))
self.append(event)
else:
self._extend_ancestor(ancestor, event)
[docs] @core_utilities.add_copy_option
def concatenate_by_tag(self, other: Concurrence) -> Concurrence:
"""Concatenate with other :class:`~mutwo.core_events.Concurrence` along their tags.
:param other: The other `Concurrence` with which to concatenate.
The other `Concurrence` can contain more or less events.
:type other: Concurrence
:param mutate: If ``False`` the function will return a copy of the given object.
If set to ``True`` the object itself will be changed and the function will
return the changed object. Default to ``True``.
:type mutate: bool
:return: Concatenated event.
:raises core_utilities.NoTagError: If any child event doesn't have a 'tag'
attribute.
:raises core_utilities.ConcatenationError: If there are any :class:`SimpleEvent`
inside a :class:`Concurrence`.
**Hint:**
Similarly to Pythons ``list.extend`` the concatenation simply appends
the children of the other event to the sequence without copying them.
This means when changing the children in the new event, it also changes
the child event in the original sequence. If you want to avoid this,
call ``event.copy()`` before concatenating it to the host event.
**Example:**
>>> from mutwo import core_events
>>> s = core_events.Concurrence(
... [core_events.TaggedConsecution([core_events.SimpleEvent(1)], tag="test")]
... )
>>> s.concatenate_by_tag(s)
Concurrence([TaggedConsecution([SimpleEvent(duration = DirectDuration(duration = 1)), SimpleEvent(duration = DirectDuration(duration = 1))])])
"""
if (self_duration := self.duration) > 0:
self.extend_until(self_duration)
for tagged_event in other:
if not hasattr(tagged_event, "tag"):
raise core_utilities.NoTagError(tagged_event)
tag = tagged_event.tag
try:
ancestor = self[tag]
except KeyError:
if self_duration > 0:
# Shallow copy before 'slide_in': We use the same
# events, but we don't want to change the other sequence.
event_new = tagged_event.empty_copy()
event_new.extend(tagged_event[:])
tagged_event = event_new.slide_in(0, core_events.SimpleEvent(self_duration))
self.append(tagged_event)
else:
self._extend_ancestor(ancestor, tagged_event)
# NOTE: 'sequentalize' is very generic, it works for all type of child
# event structure. This is good, but in it's current form it's mostly
# only useful with rather long and complex user defined 'slice_tuple_to_event'
# definitions. For instance when sequentializing
# Concurrence[Consecution[SimpleEvent]] the returned event will be
# Consecution[Concurrence[Consecution[SimpleEvent]]]. Here the
# inner consecutions are always pointless, since they will always only
# contain one chronon.
[docs] def sequentialize(
self,
slice_tuple_to_event: typing.Optional[
typing.Callable[
[tuple[core_parameters.abc.Event, ...]], core_parameters.abc.Event
]
] = None,
) -> core_events.Consecution:
"""Convert parallel structure to a sequential structure.
:param slice_tuple_to_event: In order to sequentialize the event
`mutwo` splits each child event into small 'event slices'. These
'event slices' are simply events created by the `split_at` method.
Each of those parallel slice groups need to be bound together to
one new event. These new events are sequentially ordered to result
in a new sequential structure. The simplest and default way to
archive this is by simply putting all event parts into a new
:class:`Concurrence`, so the resulting :class:`Consecution`
will be a sequence of `Concurrence`. This parameter is
available so that users can convert her/his parallel structure in
meaningful ways (for instance to imitate the ``.chordify``
`method from music21 <https://web.mit.edu/music21/doc/usersGuide/usersGuide_09_chordify.html>`
which transforms polyphonic music to a chord structure).
If ``None`` `slice_tuple_to_event` is set to
:class:`Concurrence`. Default to ``None``.
:type slice_tuple_to_event: typing.Optional[typing.Callable[[tuple[core_parameters.abc.Event, ...]], core_parameters.abc.Event]]
**Example:**
>>> from mutwo import core_events
>>> e = core_events.Concurrence(
... [
... core_events.Consecution(
... [core_events.SimpleEvent(2), core_events.SimpleEvent(1)]
... ),
... core_events.Consecution(
... [core_events.SimpleEvent(3)]
... ),
... ]
... )
>>> e.sequentialize()
Consecution([Concurrence([Consecution([SimpleEvent(duration = DirectDuration(duration = 2))]), Consecution([SimpleEvent(duration = DirectDuration(duration = 2))])]), Concurrence([Consecution([SimpleEvent(duration = DirectDuration(duration = 1))]), Consecution([SimpleEvent(duration = DirectDuration(duration = 1))])])])
"""
if slice_tuple_to_event is None:
slice_tuple_to_event = Concurrence
# Find all start/end times
absolute_time_set = set([])
for e in self:
try: # Consecution
(
absolute_time_tuple,
duration,
) = e._absolute_time_in_floats_tuple_and_duration
except AttributeError: # SimpleEvent or Concurrence
absolute_time_tuple, duration = (0,), e.duration.duration_in_floats
for t in absolute_time_tuple + (duration,):
absolute_time_set.add(t)
# Sort, but also remove the last entry: we don't need
# to split at complete duration, because after duration
# there isn't any event left in any child.
absolute_time_list = sorted(absolute_time_set)[:-1]
return core_events.Consecution(
self._make_event_slice_tuple(absolute_time_list, slice_tuple_to_event)
)
[docs] def split_at(
self,
*absolute_time: core_parameters.abc.Duration,
ignore_invalid_split_point: bool = False,
) -> tuple[Concurrence, ...]:
if not absolute_time:
raise core_utilities.NoSplitTimeError()
absolute_time = sorted(absolute_time)
self._assert_valid_absolute_time(absolute_time[0])
if absolute_time[-1] > self.duration and not ignore_invalid_split_point:
raise core_utilities.SplitError(absolute_time[-1])
def slice_tuple_to_event(slice_tuple):
e = self.empty_copy()
e[:] = slice_tuple
return e
return self._make_event_slice_tuple(absolute_time, slice_tuple_to_event)
[docs]@core_utilities.add_tag_to_class
class TaggedSimpleEvent(SimpleEvent):
""":class:`SimpleEvent` with tag."""
[docs]@core_utilities.add_tag_to_class
class TaggedConsecution(
Consecution, typing.Generic[T], class_specific_side_attribute_tuple=("tag",)
):
""":class:`Consecution` with tag."""
[docs]@core_utilities.add_tag_to_class
class TaggedConcurrence(
Concurrence, typing.Generic[T], class_specific_side_attribute_tuple=("tag",)
):
""":class:`Concurrence` with tag."""
[docs] def sequentialize(self, *args, **kwargs):
consecution = super().sequentialize(*args, **kwargs)
return TaggedConsecution(consecution, tag=self.tag)