Skip to content

LangGraph Go

How github.com/smallnest/langgraphgo (v0.8.5) fits into the arcnem-vision Go services — the graph execution engine that orchestrates agent workflows for document processing.

Requires Go 1.25+. Depends on github.com/tmc/langchaingo for LLM and tool interfaces (see the LangChain Go guide).

ServiceWhat langgraphgo does there
models/agentsBuilds and executes agent workflow graphs loaded from the database. Inngest triggers the job; langgraphgo runs the graph.
FutureStreaming execution results back to clients. Multi-agent supervisor patterns. Human-in-the-loop approval flows.

What langgraphgo does NOT cover: Event scheduling and durable execution (that’s Inngest), LLM calls and tool interfaces (that’s langchaingo), or database access (that’s GORM).


LangGraphGo models workflows as directed graphs where state flows through nodes connected by edges.

START ──> [node_a] ──> [node_b] ──?──> [node_c] ──> END
└──> [node_d] ──> END
ConceptWhat it is
StateGraph[S]The graph definition. S is your state type (struct or map[string]any).
NodeA function func(ctx, state S) (S, error) that transforms state.
EdgeA static connection from one node to another.
Conditional EdgeA dynamic connection where a function inspects state and returns the next node name.
ENDSpecial constant (graph.END). An edge to END terminates the graph.
StateRunnable[S]The compiled graph. Call .Invoke(ctx, state) to execute.

type ProcessingState struct {
ObjectKey string `json:"object_key"`
Description string `json:"description"`
Embedding []float32 `json:"embedding"`
DocumentID string `json:"document_id"`
}
g := graph.NewStateGraph[ProcessingState]()
g.AddNode("describe", "Generate image description", func(ctx context.Context, state ProcessingState) (ProcessingState, error) {
description, err := describeImage(ctx, state.ObjectKey)
if err != nil {
return state, err
}
state.Description = description
return state, nil
})
g.AddNode("embed", "Create embedding", func(ctx context.Context, state ProcessingState) (ProcessingState, error) {
vec, err := embedder.EmbedQuery(ctx, state.Description)
if err != nil {
return state, err
}
state.Embedding = vec
return state, nil
})
g.SetEntryPoint("describe")
g.AddEdge("describe", "embed")
g.AddEdge("embed", graph.END)
runnable, _ := g.Compile()
result, _ := runnable.Invoke(ctx, ProcessingState{
ObjectKey: "uploads/abc123.jpg",
DocumentID: "doc-uuid",
})

Conditional edges let the graph branch based on state.

g.AddConditionalEdge("classify", func(ctx context.Context, state ProcessingState) string {
if state.DocType == "image" {
return "process_image"
}
return "process_text"
})

When multiple edges fan out from a single node, the targets run in parallel automatically.

// Fan-out: both "ocr" and "caption" run concurrently after "load"
g.AddEdge("load", "ocr")
g.AddEdge("load", "caption")
// Fan-in: both converge to "combine"
g.AddEdge("ocr", "combine")
g.AddEdge("caption", "combine")

When nodes run in parallel and both modify state, the last result wins by default. Use a state merger or schema with reducers for smarter merging.


Schemas define how node outputs merge into the running state.

Built-in reducers:

ReducerBehavior
graph.OverwriteReducerNew value replaces old (default)
graph.AppendReducerAppends to slice
graph.AddMessagesAppends messages with ID-based upsert

g.SetRetryPolicy(&graph.RetryPolicy{
MaxRetries: 3,
BackoffStrategy: graph.ExponentialBackoff,
RetryableErrors: []string{"timeout", "rate limit", "503"},
})
g.AddNodeWithRetry("call_api", "Call external API", callApiFn, &graph.RetryConfig{
MaxAttempts: 5,
InitialDelay: 200 * time.Millisecond,
MaxDelay: 10 * time.Second,
BackoffFactor: 2.0,
})
g.AddNodeWithCircuitBreaker("external_api", "Call external API", callExternalFn, graph.CircuitBreakerConfig{
FailureThreshold: 5,
SuccessThreshold: 2,
Timeout: 30 * time.Second,
})

Pause graph execution at specific nodes for human approval.

config := &graph.Config{
InterruptBefore: []string{"dangerous_action"},
}
state, err := runnable.InvokeWithConfig(ctx, initialState, config)
if gi, ok := err.(*graph.GraphInterrupt); ok {
// Show state to user for approval, then resume
resumeConfig := &graph.Config{
ResumeFrom: []string{gi.Node},
}
finalState, err := runnable.InvokeWithConfig(ctx, state, resumeConfig)
}

Save and resume graph execution across process restarts.

g := graph.NewCheckpointableStateGraph[map[string]any]()
g.SetCheckpointConfig(graph.CheckpointConfig{
Store: graph.NewMemoryCheckpointStore(),
AutoSave: true,
MaxCheckpoints: 20,
})

Available stores: Memory, File, Redis, PostgreSQL, SQLite.


AgentConstructorWhen to use
ReActprebuilt.CreateReactAgentMap()Reason-Act loop with tools
CreateAgentprebuilt.CreateAgentMap()Configurable agent with system messages
Supervisorprebuilt.CreateSupervisorMap()Multi-agent orchestration
ChatAgentprebuilt.CreateChatAgent()Multi-turn conversation
ReflectionAgentprebuilt.CreateReflectionAgent()Self-improving output
PlanningAgentprebuilt.CreatePlanningAgent()Plan-then-execute workflows

LangGraphGo has a built-in adapter to convert MCP tools into langchaingo tools:

import mcpadapter "github.com/smallnest/langgraphgo/adapter/mcp"
client, err := mcpadapter.NewClientFromConfig(ctx, "./mcp-config.json")
mcpTools, err := mcpadapter.MCPToTools(ctx, client)
agent, _ := prebuilt.CreateAgentMap(model, mcpTools, 20)

Our architecture is unique: agent graphs are defined in the database, not in code. The DB schema (agent_graphs, agent_graph_nodes, agent_graph_edges) stores the graph structure. At runtime, we load a Snapshot and build a langgraphgo StateGraph from it.

func BuildGraph(snapshot *Snapshot, factory NodeFuncFactory) (*StateGraph, error) {
g := graphlib.NewStateGraph[map[string]any]()
for _, snapshotNode := range snapshot.Nodes {
nodeFn, err := factory(snapshotNode)
g.AddNode(nodeKey, snapshotNode.Node.NodeType, nodeFn)
}
g.SetEntryPoint(snapshot.AgentGraph.EntryNode)
for _, edge := range snapshot.Edges {
g.AddEdge(edge.FromNode, edge.ToNode)
}
return g, nil
}

PatternWhen to useArcnem Vision example
Basic StateGraphFixed, simple pipelineDescribe → embed → store
Conditional edgesBranch based on contentRoute by document type
Parallel executionIndependent stepsOCR + caption generation
CheckpointingLong-running or crash-sensitiveMulti-step document processing
StreamingReal-time progress updatesProcessing status to client
InterruptsHuman approval neededLow-confidence classifications
ReAct agentOpen-ended tool use”Find similar images and explain why”
SupervisorMulti-agent coordinationImage processor + search agent
Schema-driven (DB)Per-device configurable workflowsDifferent graphs per device

  1. map[string]any requires type assertions everywhere. Prefer typed state for new graphs.
  2. Parallel node state merging. Without a schema or merger, the last-finishing parallel node’s output overwrites everything.
  3. Conditional edges replace static edges. Don’t mix both from the same source node.
  4. graph.END is the string "END". Don’t name a node “END”.
  5. Compilation is cheap. You can compile per-request if the graph is built dynamically.
  6. Node functions must be goroutine-safe when using parallel execution.
  7. Inngest steps vs langgraphgo nodes are different layers. Don’t confuse the two retry mechanisms.