Skip to content

Branches

Branch

A branch of the graph.

The join parameter is extracted from the last element of the edges.

Parameters:

Name Type Description Default
edges BranchContainer[T, S]

The edges of the branch.

required
source SingleSource[T, S]

The source of the branch, which is handled as start node of the branch.

required
Source code in src/edgygraph/graph/branches.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
class Branch[T: StateProtocol, S: SharedProtocol]:
    """
    A branch of the graph.

    The join parameter is extracted from the last element of the edges.


    Args:
        edges: The edges of the branch.
        source: The source of the branch, which is handled as start node of the branch.

    """


    def __init__(self, edges: BranchContainer[T, S], source: SingleSource[T, S]) -> None:

        self.edges = edges[:-1]
        self.source = source
        self.join = edges[-1]

        self.result: asyncio.Future[dict[tuple[Hashable, ...], Change]] | None = None

        self.edge_index: dict[SingleSource[T, S], list[Entry[T, S]]] = defaultdict(list)
        self.error_edge_index: dict[SingleErrorSource[T, S], list[ErrorEntry[T, S]]] = defaultdict(list)

        self.index_edges()


    def index_edges(self) -> None:
        """
        Index the edges by single source.
        """


        for i, (source, next) in enumerate(zip(self.edges, self.edges[1:])):

            if not Types[T, S].is_any_source(source) and not Types[T, S].is_next(next):
                raise ValueError(f"Invalid edge: source: {source}, next: {next} in branch with source {self.source} and join {self.join}")

            if Types[T, S].is_any_source(source) and Types[T, S].is_next_with_config(next):

                try:

                    filtered_source = self.filter_source_by_config(source)
                    filtered_next = self.filter_next_by_config(next)

                except EmptyFilterResult:

                    if Types[T, S].is_any_source(next) and Types[T, S].is_next_with_config(source):
                        try:
                            self.filter_source_by_config(next)
                            self.filter_next_by_config(source)
                            continue
                        except EmptyFilterResult:
                            raise ValueError(f"No edge after filtering and source is empty after filtering as next or next is empty after filtering as source: source: {source}, next: {next} in branch with source {self.source} and join {self.join}")
                    else:
                        raise ValueError(f"No edge after filtering and source is not a valid next and next is not a valid source: source: {source}, next: {next} in branch with source {self.source} and join {self.join}")


                sources = filtered_source if isinstance(filtered_source, list) else [filtered_source]

                for s in sources:

                    if Types[T, S].is_single_source(s):
                        self.index_edge(Edge(source=s, next=filtered_next), i)
                    elif Types[T, S].is_single_error_source(s):
                        self.index_edge(ErrorEdge(source=s, next=filtered_next), i)
                    else:
                        raise ValueError(f"Invalid filtered source: {filtered_source} from original source: {source} in branch with source {self.source} and join {self.join}")



    def index_edge(self, edge: Edge[T, S] | ErrorEdge[T, S], index: int) -> None:
        """
        Index a single edge by its source.

        Append the edge to
        - `edge_index` if the edge is a normal edge
        - `error_edge_index` if the edge is an error edge

        Args:
            edge: The edge to index.
            index: The original index of the edge in the list of edges of the branch.
        """

        print(f"Indexing edge {edge} at index {index} in branch with source {self.source} and join {self.join}")

        match edge:
            case ErrorEdge(source=source, next=next):
                self.error_edge_index[source].append(ErrorEntry[T, S](next=next, index=index))
            case Edge(source=source, next=next):
                self.edge_index[source].append(Entry[T, S](next=next, index=index))



    def filter_source_by_config(self, source: SourceWithConfig[T, S] | ErrorSource[T, S]) -> Source[T, S] | ErrorSource[T, S]:
        """
        Filter the source by its config.

        The config is used to track the Operators `-` and `+` on nodes.

        Args:
            source: The source to filter.

        Returns:
            The filtered source without config.

        Raises:
            EmptyFilterResult: If the whole source is filtered out.
        """

        if Types[T, S].is_single_source_with_config(source):

            if isinstance(source, tuple):

                (source, config) = source

                if config.only_next:
                    raise EmptyFilterResult()

            return source

        elif Types[T, S].is_single_source_with_config_list(source):

            filtered_sources: list[SingleSource[T, S]] = []

            for single_source in source:

                if isinstance(single_source, tuple):

                    (single_source, config) = single_source

                    if not config.only_next:
                        filtered_sources.append(single_source)

                else:
                    filtered_sources.append(single_source)

            if filtered_sources == []:
                raise EmptyFilterResult()

            return filtered_sources

        elif Types[T, S].is_error_source(source):

            return source # Error sources have no filters


        else:
            raise ValueError(f"Invalid source: {source} in branch with source {self.source} and join {self.join}")



    def filter_next_by_config(self, next: NextWithConfig[T, S]) -> Next[T, S]:
        """
        Filter the next by its config.

        The config is used to track the Operators `-` and `+` on nodes.

        Args:
            next: The next to filter.

        Returns:
            The filtered next without the config.

        Raises:
            EmptyFilterResult: If the next is filtered out.
        """

        if Types[T, S].is_next_callable(next):
            return next

        elif Types[T, S].is_single_next_with_config(next):

            if isinstance(next, tuple):

                (next, config) = next

                if config.only_source:
                    raise EmptyFilterResult()

            return next

        elif Types[T, S].is_single_next_with_config_list(next):

            filtered_next: list[SingleNext[T, S]] = []

            for single_next in next:

                if isinstance(single_next, tuple):

                    (single_next, config) = single_next

                    if not config.only_source:
                        filtered_next.append(single_next)

                else:
                    filtered_next.append(single_next)

            if filtered_next == []:
                raise EmptyFilterResult()

            return filtered_next

        else:
            raise ValueError(f"Invalid next: {next} in branch with source {self.source} and join {self.join}")

index_edges()

Index the edges by single source.

Source code in src/edgygraph/graph/branches.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def index_edges(self) -> None:
    """
    Index the edges by single source.
    """


    for i, (source, next) in enumerate(zip(self.edges, self.edges[1:])):

        if not Types[T, S].is_any_source(source) and not Types[T, S].is_next(next):
            raise ValueError(f"Invalid edge: source: {source}, next: {next} in branch with source {self.source} and join {self.join}")

        if Types[T, S].is_any_source(source) and Types[T, S].is_next_with_config(next):

            try:

                filtered_source = self.filter_source_by_config(source)
                filtered_next = self.filter_next_by_config(next)

            except EmptyFilterResult:

                if Types[T, S].is_any_source(next) and Types[T, S].is_next_with_config(source):
                    try:
                        self.filter_source_by_config(next)
                        self.filter_next_by_config(source)
                        continue
                    except EmptyFilterResult:
                        raise ValueError(f"No edge after filtering and source is empty after filtering as next or next is empty after filtering as source: source: {source}, next: {next} in branch with source {self.source} and join {self.join}")
                else:
                    raise ValueError(f"No edge after filtering and source is not a valid next and next is not a valid source: source: {source}, next: {next} in branch with source {self.source} and join {self.join}")


            sources = filtered_source if isinstance(filtered_source, list) else [filtered_source]

            for s in sources:

                if Types[T, S].is_single_source(s):
                    self.index_edge(Edge(source=s, next=filtered_next), i)
                elif Types[T, S].is_single_error_source(s):
                    self.index_edge(ErrorEdge(source=s, next=filtered_next), i)
                else:
                    raise ValueError(f"Invalid filtered source: {filtered_source} from original source: {source} in branch with source {self.source} and join {self.join}")

index_edge(edge, index)

Index a single edge by its source.

Append the edge to - edge_index if the edge is a normal edge - error_edge_index if the edge is an error edge

Parameters:

Name Type Description Default
edge Edge[T, S] | ErrorEdge[T, S]

The edge to index.

required
index int

The original index of the edge in the list of edges of the branch.

required
Source code in src/edgygraph/graph/branches.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
def index_edge(self, edge: Edge[T, S] | ErrorEdge[T, S], index: int) -> None:
    """
    Index a single edge by its source.

    Append the edge to
    - `edge_index` if the edge is a normal edge
    - `error_edge_index` if the edge is an error edge

    Args:
        edge: The edge to index.
        index: The original index of the edge in the list of edges of the branch.
    """

    print(f"Indexing edge {edge} at index {index} in branch with source {self.source} and join {self.join}")

    match edge:
        case ErrorEdge(source=source, next=next):
            self.error_edge_index[source].append(ErrorEntry[T, S](next=next, index=index))
        case Edge(source=source, next=next):
            self.edge_index[source].append(Entry[T, S](next=next, index=index))

filter_source_by_config(source)

Filter the source by its config.

The config is used to track the Operators - and + on nodes.

Parameters:

Name Type Description Default
source SourceWithConfig[T, S] | ErrorSource[T, S]

The source to filter.

required

Returns:

Type Description
Source[T, S] | ErrorSource[T, S]

The filtered source without config.

Raises:

Type Description
EmptyFilterResult

If the whole source is filtered out.

Source code in src/edgygraph/graph/branches.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
def filter_source_by_config(self, source: SourceWithConfig[T, S] | ErrorSource[T, S]) -> Source[T, S] | ErrorSource[T, S]:
    """
    Filter the source by its config.

    The config is used to track the Operators `-` and `+` on nodes.

    Args:
        source: The source to filter.

    Returns:
        The filtered source without config.

    Raises:
        EmptyFilterResult: If the whole source is filtered out.
    """

    if Types[T, S].is_single_source_with_config(source):

        if isinstance(source, tuple):

            (source, config) = source

            if config.only_next:
                raise EmptyFilterResult()

        return source

    elif Types[T, S].is_single_source_with_config_list(source):

        filtered_sources: list[SingleSource[T, S]] = []

        for single_source in source:

            if isinstance(single_source, tuple):

                (single_source, config) = single_source

                if not config.only_next:
                    filtered_sources.append(single_source)

            else:
                filtered_sources.append(single_source)

        if filtered_sources == []:
            raise EmptyFilterResult()

        return filtered_sources

    elif Types[T, S].is_error_source(source):

        return source # Error sources have no filters


    else:
        raise ValueError(f"Invalid source: {source} in branch with source {self.source} and join {self.join}")

filter_next_by_config(next)

Filter the next by its config.

The config is used to track the Operators - and + on nodes.

Parameters:

Name Type Description Default
next NextWithConfig[T, S]

The next to filter.

required

Returns:

Type Description
Next[T, S]

The filtered next without the config.

Raises:

Type Description
EmptyFilterResult

If the next is filtered out.

Source code in src/edgygraph/graph/branches.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
def filter_next_by_config(self, next: NextWithConfig[T, S]) -> Next[T, S]:
    """
    Filter the next by its config.

    The config is used to track the Operators `-` and `+` on nodes.

    Args:
        next: The next to filter.

    Returns:
        The filtered next without the config.

    Raises:
        EmptyFilterResult: If the next is filtered out.
    """

    if Types[T, S].is_next_callable(next):
        return next

    elif Types[T, S].is_single_next_with_config(next):

        if isinstance(next, tuple):

            (next, config) = next

            if config.only_source:
                raise EmptyFilterResult()

        return next

    elif Types[T, S].is_single_next_with_config_list(next):

        filtered_next: list[SingleNext[T, S]] = []

        for single_next in next:

            if isinstance(single_next, tuple):

                (single_next, config) = single_next

                if not config.only_source:
                    filtered_next.append(single_next)

            else:
                filtered_next.append(single_next)

        if filtered_next == []:
            raise EmptyFilterResult()

        return filtered_next

    else:
        raise ValueError(f"Invalid next: {next} in branch with source {self.source} and join {self.join}")

EmptyFilterResult

Bases: Exception

Exception raised when a filter result is empty.

Source code in src/edgygraph/graph/branches.py
220
221
222
223
class EmptyFilterResult(Exception):
    """
    Exception raised when a filter result is empty.
    """