Quick Start
Install:
These examples use the public Python facade over the native Rust graph engine. They intentionally avoid raw native handles, raw protocol ingress, and retired pure-Python runtime surfaces.
Create a graph:
from graphrefly import Graph
with Graph("counter") as graph:
count = graph.state(0, name="count")
doubled = graph.derived([count], lambda value: value * 2, name="doubled")
seen: list[int] = []
def on_message(msg) -> None:
if msg.kind == "DATA":
seen.append(msg.value)
with doubled.subscribe(on_message):
count.set(1)
count.set(2)
assert seen[-1] == 4
assert doubled.cache() == 4
Advanced Node Callback
Use Graph.node(...) when the callback needs the Ctx surface:
from graphrefly import Ctx, Graph
with Graph("advanced") as graph:
source = graph.state(1, name="source")
def add_ten(ctx: Ctx) -> None:
ctx.emit(ctx.data(0) + 10)
result = graph.node([source], add_ten, name="result")
with result.subscribe(lambda msg: None):
source.set(5)
assert result.cache() == 15
Async Boundary
GraphReFly does not own your async runtime. Install optional adapters when you want Trio or AnyIO convenience helpers:
import trio
from graphrefly import Graph, from_awaitable, trio_runner
async def fetch_value() -> int:
return 42
async def main() -> None:
graph = Graph("async-demo")
async with trio.open_nursery() as nursery:
node = from_awaitable(graph, trio_runner(nursery), fetch_value, name="value")
with node.subscribe(lambda msg: print(msg.kind, msg.value)):
await trio.lowlevel.checkpoint()