Skip to content

Hooks

GraphHook

Bases: ABC

Hook for the graph execution.

Hooks are called at different stages of the graph execution. They can be used to log, modify the state, or perform other actions.

Source code in src/edgygraph/graph/hooks.py
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
class GraphHook[T: StateProtocol, S: SharedProtocol](ABC):
    """
    Hook for the graph execution.

    Hooks are called at different stages of the graph execution.
    They can be used to log, modify the state, or perform other actions.
    """

    async def on_graph_start(self, state: T, shared: S) -> None:
        """
        Called when the graph starts.

        Args:
            state: The initial state of the graph.
            shared: The initial shared state of the graph.
        """

        pass


    async def on_step_start(self, state: T, shared: S, nodes: list[NextNode[T, S]]) -> None:
        """
        Called when a step starts.

        Args:
            state: The state of the graph.
            shared: The shared state of the graph.
            nodes: The nodes that will be executed in this step.
        """

        pass


    async def on_step_end(self, state: T, shared: S, nodes: list[NextNode[T, S]]) -> None:
        """
        Called when a step ends.

        It is called after all nodes have been executed and the state has been merged.

        Args:
            state: The updated state of the graph.
            shared: The shared state of the graph.
            nodes: The nodes that were executed in this step.
        """

        pass


    async def on_spawn_branch_start(self, state: T, shared: S, branch: Branch[T, S], trigger: NextNode[T, S], branch_registry: dict[SingleSource[T, S], list[Branch[T, S]]], join_registry: dict[BranchJoin[T, S], list[Branch[T, S]]]):
        """
        Called before a branch is spawned.

        Args:
            state: The state of the graph.
            shared: The shared state of the graph.
            branch: The branch to be spawned.
            trigger: The node that spawned the branch.
            branch_registry: The branch registry of the graph.
            join_registry: The current join registry of the graph.
        """

        pass


    async def on_spawn_branch_end(self, state: T, shared: S, branch: Branch[T, S], trigger: NextNode[T, S], branch_registry: dict[SingleSource[T, S], list[Branch[T, S]]], join_registry: dict[BranchJoin[T, S], list[Branch[T, S]]]):
        """
        Called after a branch is spawned.

        Args:
            state: The state of the graph.
            shared: The shared state of the graph.
            branch: The branch that was spawned.
            trigger: The node that spawned the branch.
            branch_registry: The branch registry of the graph.
            join_registry: The current join registry of the graph.
        """

        pass



    async def on_merge_start(self, state: T, result_states: list[T], changes: list[dict[tuple[Hashable, ...], Change]]) -> None:
        """
        Called when the merge process starts.

        Args:
            state: The old state of the graph.
            result_states: The result states of the nodes.
            changes: The changes that will be applied to the state.
        """

        pass


    async def on_merge_conflict(self, state: T, changes: list[dict[tuple[Hashable, ...], Change]], conflicts: dict[tuple[Hashable, ...], list[Change]]) -> None:
        """
        Called when a merge conflict occurs.

        Args:
            state: The old state of the graph.
            changes: The changes that will be applied to the state.
            conflicts: The conflicts that occurred during the merge process.
        """

        pass


    async def on_merge_end(self, state: T, result_states: list[T], changes: list[dict[tuple[Hashable, ...], Change]], merged_state: T) -> None:
        """
        Called when the merge process ends.

        Args:
            state: The old state of the graph.
            result_states: The result states of the nodes.
            changes: The changes that have been applied to the state.
            merged_state: The new merged state of the graph.
        """

        pass


    async def on_graph_end(self, state: T, shared: S) -> None:
        """
        Called when the graph execution ends.

        Args:
            state: The final state of the graph.
            shared: The final shared data.
        """

        pass


    async def on_error(self, error: Exception, state: T, shared: S) -> Exception | None:
        """
        Called when an error occurs during the graph execution.

        Args:
            error: The error that occurred.

        Returns:
           The error to raise, or None not to raise an error.
        """

        return error

on_graph_start(state, shared) async

Called when the graph starts.

Parameters:

Name Type Description Default
state T

The initial state of the graph.

required
shared S

The initial shared state of the graph.

required
Source code in src/edgygraph/graph/hooks.py
19
20
21
22
23
24
25
26
27
28
async def on_graph_start(self, state: T, shared: S) -> None:
    """
    Called when the graph starts.

    Args:
        state: The initial state of the graph.
        shared: The initial shared state of the graph.
    """

    pass

on_step_start(state, shared, nodes) async

Called when a step starts.

Parameters:

Name Type Description Default
state T

The state of the graph.

required
shared S

The shared state of the graph.

required
nodes list[NextNode[T, S]]

The nodes that will be executed in this step.

required
Source code in src/edgygraph/graph/hooks.py
31
32
33
34
35
36
37
38
39
40
41
async def on_step_start(self, state: T, shared: S, nodes: list[NextNode[T, S]]) -> None:
    """
    Called when a step starts.

    Args:
        state: The state of the graph.
        shared: The shared state of the graph.
        nodes: The nodes that will be executed in this step.
    """

    pass

on_step_end(state, shared, nodes) async

Called when a step ends.

It is called after all nodes have been executed and the state has been merged.

Parameters:

Name Type Description Default
state T

The updated state of the graph.

required
shared S

The shared state of the graph.

required
nodes list[NextNode[T, S]]

The nodes that were executed in this step.

required
Source code in src/edgygraph/graph/hooks.py
44
45
46
47
48
49
50
51
52
53
54
55
56
async def on_step_end(self, state: T, shared: S, nodes: list[NextNode[T, S]]) -> None:
    """
    Called when a step ends.

    It is called after all nodes have been executed and the state has been merged.

    Args:
        state: The updated state of the graph.
        shared: The shared state of the graph.
        nodes: The nodes that were executed in this step.
    """

    pass

on_spawn_branch_start(state, shared, branch, trigger, branch_registry, join_registry) async

Called before a branch is spawned.

Parameters:

Name Type Description Default
state T

The state of the graph.

required
shared S

The shared state of the graph.

required
branch Branch[T, S]

The branch to be spawned.

required
trigger NextNode[T, S]

The node that spawned the branch.

required
branch_registry dict[SingleSource[T, S], list[Branch[T, S]]]

The branch registry of the graph.

required
join_registry dict[BranchJoin[T, S], list[Branch[T, S]]]

The current join registry of the graph.

required
Source code in src/edgygraph/graph/hooks.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
async def on_spawn_branch_start(self, state: T, shared: S, branch: Branch[T, S], trigger: NextNode[T, S], branch_registry: dict[SingleSource[T, S], list[Branch[T, S]]], join_registry: dict[BranchJoin[T, S], list[Branch[T, S]]]):
    """
    Called before a branch is spawned.

    Args:
        state: The state of the graph.
        shared: The shared state of the graph.
        branch: The branch to be spawned.
        trigger: The node that spawned the branch.
        branch_registry: The branch registry of the graph.
        join_registry: The current join registry of the graph.
    """

    pass

on_spawn_branch_end(state, shared, branch, trigger, branch_registry, join_registry) async

Called after a branch is spawned.

Parameters:

Name Type Description Default
state T

The state of the graph.

required
shared S

The shared state of the graph.

required
branch Branch[T, S]

The branch that was spawned.

required
trigger NextNode[T, S]

The node that spawned the branch.

required
branch_registry dict[SingleSource[T, S], list[Branch[T, S]]]

The branch registry of the graph.

required
join_registry dict[BranchJoin[T, S], list[Branch[T, S]]]

The current join registry of the graph.

required
Source code in src/edgygraph/graph/hooks.py
75
76
77
78
79
80
81
82
83
84
85
86
87
88
async def on_spawn_branch_end(self, state: T, shared: S, branch: Branch[T, S], trigger: NextNode[T, S], branch_registry: dict[SingleSource[T, S], list[Branch[T, S]]], join_registry: dict[BranchJoin[T, S], list[Branch[T, S]]]):
    """
    Called after a branch is spawned.

    Args:
        state: The state of the graph.
        shared: The shared state of the graph.
        branch: The branch that was spawned.
        trigger: The node that spawned the branch.
        branch_registry: The branch registry of the graph.
        join_registry: The current join registry of the graph.
    """

    pass

on_merge_start(state, result_states, changes) async

Called when the merge process starts.

Parameters:

Name Type Description Default
state T

The old state of the graph.

required
result_states list[T]

The result states of the nodes.

required
changes list[dict[tuple[Hashable, ...], Change]]

The changes that will be applied to the state.

required
Source code in src/edgygraph/graph/hooks.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
async def on_merge_start(self, state: T, result_states: list[T], changes: list[dict[tuple[Hashable, ...], Change]]) -> None:
    """
    Called when the merge process starts.

    Args:
        state: The old state of the graph.
        result_states: The result states of the nodes.
        changes: The changes that will be applied to the state.
    """

    pass

on_merge_conflict(state, changes, conflicts) async

Called when a merge conflict occurs.

Parameters:

Name Type Description Default
state T

The old state of the graph.

required
changes list[dict[tuple[Hashable, ...], Change]]

The changes that will be applied to the state.

required
conflicts dict[tuple[Hashable, ...], list[Change]]

The conflicts that occurred during the merge process.

required
Source code in src/edgygraph/graph/hooks.py
105
106
107
108
109
110
111
112
113
114
115
async def on_merge_conflict(self, state: T, changes: list[dict[tuple[Hashable, ...], Change]], conflicts: dict[tuple[Hashable, ...], list[Change]]) -> None:
    """
    Called when a merge conflict occurs.

    Args:
        state: The old state of the graph.
        changes: The changes that will be applied to the state.
        conflicts: The conflicts that occurred during the merge process.
    """

    pass

on_merge_end(state, result_states, changes, merged_state) async

Called when the merge process ends.

Parameters:

Name Type Description Default
state T

The old state of the graph.

required
result_states list[T]

The result states of the nodes.

required
changes list[dict[tuple[Hashable, ...], Change]]

The changes that have been applied to the state.

required
merged_state T

The new merged state of the graph.

required
Source code in src/edgygraph/graph/hooks.py
118
119
120
121
122
123
124
125
126
127
128
129
async def on_merge_end(self, state: T, result_states: list[T], changes: list[dict[tuple[Hashable, ...], Change]], merged_state: T) -> None:
    """
    Called when the merge process ends.

    Args:
        state: The old state of the graph.
        result_states: The result states of the nodes.
        changes: The changes that have been applied to the state.
        merged_state: The new merged state of the graph.
    """

    pass

on_graph_end(state, shared) async

Called when the graph execution ends.

Parameters:

Name Type Description Default
state T

The final state of the graph.

required
shared S

The final shared data.

required
Source code in src/edgygraph/graph/hooks.py
132
133
134
135
136
137
138
139
140
141
async def on_graph_end(self, state: T, shared: S) -> None:
    """
    Called when the graph execution ends.

    Args:
        state: The final state of the graph.
        shared: The final shared data.
    """

    pass

on_error(error, state, shared) async

Called when an error occurs during the graph execution.

Parameters:

Name Type Description Default
error Exception

The error that occurred.

required

Returns:

Type Description
Exception | None

The error to raise, or None not to raise an error.

Source code in src/edgygraph/graph/hooks.py
144
145
146
147
148
149
150
151
152
153
154
155
async def on_error(self, error: Exception, state: T, shared: S) -> Exception | None:
    """
    Called when an error occurs during the graph execution.

    Args:
        error: The error that occurred.

    Returns:
       The error to raise, or None not to raise an error.
    """

    return error