Part 2 of the Custom Geth Consensus Series

In Part 1, we covered the Engine API fundamentals — how to authenticate with Geth, trigger block building, and finalize blocks. Now we’ll wrap those same calls into a production-ready application with retry logic, health checks, and graceful shutdown. Full source code is on GitHub.

What We’re Building

A single-node consensus layer that:

  • Produces blocks continuously
  • Handles transient failures with exponential backoff
  • Exposes health checks for orchestration
  • Supports graceful shutdown
  • Provides Prometheus metrics

Application Structure

The application has four main components:

┌──────────────────────────────────────────────────────┐
│                   SingleNodeApp                       │
│  ┌──────────────┐ ┌──────────────┐ ┌──────────────┐  │
│  │ BlockBuilder  │ │ StateManager │ │ HealthServer │  │
│  └──────┬───────┘ └──────┬───────┘ └──────────────┘  │
│         │                │                            │
│  ┌──────▼────────────────▼───────────────────────┐   │
│  │               Run Loop                         │   │
│  │  GetPayload ──► Read State ──► FinalizeBlock   │   │
│  └────────────────────────────────────────────────┘   │
└──────────────────────────────────────────────────────┘

Configuration

Define all configurable parameters:

type Config struct {
    InstanceID               string        // Unique node identifier
    EthClientURL             string        // Engine API endpoint (e.g., http://localhost:8551)
    JWTSecret                string        // Hex-encoded 32-byte secret
    PriorityFeeRecipient     string        // Address receiving priority fees
    EVMBuildDelay            time.Duration // Wait after ForkchoiceUpdated
    EVMBuildDelayEmptyBlocks time.Duration // Min time between empty blocks
    TxPoolPollingInterval    time.Duration // Poll interval when mempool empty
    HealthAddr               string        // Health endpoint address
}

Sensible defaults:

ParameterDefaultPurpose
EVMBuildDelay1msTime for Geth to include transactions
EVMBuildDelayEmptyBlocks1minAvoid spamming empty blocks
TxPoolPollingInterval5msCheck for new transactions

State Manager

Between building and finalizing a block, we need to persist the payload. Why not just pass the struct directly? Because in Part 3, this state moves to Redis, so we serialize it now to keep the interface consistent. The payload and EIP-7685 requests are encoded as msgpack + base64 strings.

package state

type BuildStep int

const (
    StepBuildBlock BuildStep = iota
    StepFinalizeBlock
)

type BlockBuildState struct {
    CurrentStep      BuildStep
    PayloadID        string
    ExecutionPayload string // Base64-encoded msgpack
    Requests         string // Base64-encoded msgpack (EIP-7685)
}

type StateManager interface {
    SaveBlockState(ctx context.Context, state *BlockBuildState) error
    GetBlockBuildState(ctx context.Context) BlockBuildState
    ResetBlockState(ctx context.Context) error
}

The StateManager interface is designed to be swappable. For single-node, LocalStateManager is a trivial in-memory implementation — see the full source. Part 3 will replace it with Redis-backed state.

Retry Logic with Exponential Backoff

Network issues and Geth restarts are common. Every Engine API call is wrapped in exponential backoff using cenkalti/backoff. Return backoff.Permanent(err) for errors that shouldn’t be retried (like INVALID payload status):

func retryWithBackoff(ctx context.Context, maxAttempts uint64, logger *slog.Logger, operation func() error) error {
    eb := backoff.NewExponentialBackOff()
    eb.InitialInterval = 200 * time.Millisecond
    eb.MaxInterval = 30 * time.Second

    b := backoff.WithMaxRetries(eb, maxAttempts)

    return backoff.Retry(func() error {
        select {
        case <-ctx.Done():
            return backoff.Permanent(ctx.Err())
        default:
            if err := operation(); err != nil {
                logger.Warn("Operation failed, retrying", "error", err)
                return err
            }
            return nil
        }
    }, backoff.WithContext(b, ctx))
}

You’ll see this used throughout the block builder below.

Block Builder

The block builder orchestrates the two-phase process from Part 1, with retry logic around every Engine API call.

type BlockBuilder struct {
    stateManager          state.StateManager
    engineCl              EngineClient
    logger                *slog.Logger
    buildDelay            time.Duration
    buildEmptyBlocksDelay time.Duration
    feeRecipient          common.Address
    executionHead         *state.ExecutionHead
}

var ErrEmptyBlock = errors.New("empty block skipped")

Phase 1: Building a Block

GetPayload triggers block assembly and saves the result to state. On startup (or restart), it queries Geth for the current chain head via SetExecutionHeadFromRPC so it knows where to build from.

When there are no pending transactions, Geth returns an empty block. Rather than finalizing it, we return ErrEmptyBlock and wait for buildEmptyBlocksDelay before trying again — this avoids spamming the chain with empty blocks.

func (bb *BlockBuilder) GetPayload(ctx context.Context) error {
    // On first call or after restart, query Geth for current head
    if bb.executionHead == nil {
        if err := bb.SetExecutionHeadFromRPC(ctx); err != nil {
            return fmt.Errorf("set execution head: %w", err)
        }
    }

    ts := uint64(time.Now().Unix())
    if ts <= bb.executionHead.BlockTime {
        ts = bb.executionHead.BlockTime + 1
    }

    // Start block building with retry
    var payloadID *engine.PayloadID
    err := retryWithBackoff(ctx, maxAttempts, bb.logger, func() error {
        headHash := common.BytesToHash(bb.executionHead.BlockHash)

        fcs := engine.ForkchoiceStateV1{
            HeadBlockHash:      headHash,
            SafeBlockHash:      headHash,
            FinalizedBlockHash: headHash,
        }

        attrs := &engine.PayloadAttributes{
            Timestamp:             ts,
            Random:                headHash,
            SuggestedFeeRecipient: bb.feeRecipient,
            Withdrawals:           []*types.Withdrawal{},
            BeaconRoot:            &headHash,
        }

        response, err := bb.engineCl.ForkchoiceUpdatedV3(ctx, fcs, attrs)
        if err != nil {
            return err // Transient — will retry
        }
        if response.PayloadStatus.Status != engine.VALID {
            return backoff.Permanent(fmt.Errorf("invalid status: %s",
                response.PayloadStatus.Status))
        }
        payloadID = response.PayloadID
        return nil
    })
    if err != nil {
        return err
    }

    time.Sleep(bb.buildDelay)

    // Retrieve the built payload
    var payloadResp *engine.ExecutionPayloadEnvelope
    err = retryWithBackoff(ctx, maxAttempts, bb.logger, func() error {
        var err error
        payloadResp, err = bb.engineCl.GetPayloadV5(ctx, *payloadID)
        return err
    })
    if err != nil {
        return err
    }

    // Don't finalize empty blocks
    if len(payloadResp.ExecutionPayload.Transactions) == 0 {
        time.Sleep(bb.buildEmptyBlocksDelay)
        return ErrEmptyBlock
    }

    // Serialize and save state for Phase 2
    payloadData, _ := msgpack.Marshal(payloadResp.ExecutionPayload)
    requestsData, _ := msgpack.Marshal(payloadResp.Requests)

    return bb.stateManager.SaveBlockState(ctx, &state.BlockBuildState{
        CurrentStep:      state.StepFinalizeBlock,
        PayloadID:        payloadID.String(),
        ExecutionPayload: base64.StdEncoding.EncodeToString(payloadData),
        Requests:         base64.StdEncoding.EncodeToString(requestsData),
    })
}

Phase 2: Finalizing a Block

FinalizeBlock deserializes the payload from state, validates it, submits it to Geth via NewPayloadV4, then updates the fork choice to make it canonical.

func (bb *BlockBuilder) FinalizeBlock(
    ctx context.Context,
    payloadIDStr, executionPayloadStr, requestsStr string,
) error {
    payloadBytes, _ := base64.StdEncoding.DecodeString(executionPayloadStr)
    var payload engine.ExecutableData
    msgpack.Unmarshal(payloadBytes, &payload)

    var requests [][]byte
    if requestsStr != "" {
        requestsBytes, _ := base64.StdEncoding.DecodeString(requestsStr)
        msgpack.Unmarshal(requestsBytes, &requests)
    }

    if err := bb.validatePayload(payload); err != nil {
        return err
    }

    // Submit block to Geth
    parentHash := common.BytesToHash(bb.executionHead.BlockHash)
    err := retryWithBackoff(ctx, maxAttempts, bb.logger, func() error {
        status, err := bb.engineCl.NewPayloadV4(ctx, payload,
            []common.Hash{}, &parentHash, requests)
        if err != nil {
            return err
        }
        if status.Status == engine.INVALID {
            msg := "unknown"
            if status.ValidationError != nil {
                msg = *status.ValidationError
            }
            return backoff.Permanent(fmt.Errorf("invalid: %s", msg))
        }
        return nil
    })
    if err != nil {
        return err
    }

    // Set as canonical head
    fcs := engine.ForkchoiceStateV1{
        HeadBlockHash:      payload.BlockHash,
        SafeBlockHash:      payload.BlockHash,
        FinalizedBlockHash: payload.BlockHash,
    }

    err = retryWithBackoff(ctx, maxAttempts, bb.logger, func() error {
        _, err := bb.engineCl.ForkchoiceUpdatedV3(ctx, fcs, nil)
        return err
    })
    if err != nil {
        return err
    }

    bb.executionHead = &state.ExecutionHead{
        BlockHeight: payload.Number,
        BlockHash:   payload.BlockHash[:],
        BlockTime:   payload.Timestamp,
    }

    return nil
}

The Main Application

Wire everything together:

type SingleNodeApp struct {
    logger           *slog.Logger
    cfg              Config
    blockBuilder     *BlockBuilder
    stateManager     *state.LocalStateManager
    appCtx           context.Context
    cancel           context.CancelFunc
    wg               sync.WaitGroup
    connectionRefused bool
    runLoopStopped   chan struct{}
}

func NewSingleNodeApp(
    parentCtx context.Context,
    cfg Config,
    logger *slog.Logger,
) (*SingleNodeApp, error) {
    ctx, cancel := context.WithCancel(parentCtx)

    // Decode JWT
    jwtBytes, err := hex.DecodeString(cfg.JWTSecret)
    if err != nil {
        cancel()
        return nil, err
    }

    // Create Engine API client
    engineCl, err := ethclient.NewEngineClient(ctx, cfg.EthClientURL, jwtBytes)
    if err != nil {
        cancel()
        return nil, err
    }

    stateMgr := state.NewLocalStateManager()
    bb := blockbuilder.NewBlockBuilder(stateMgr, engineCl, logger,
        cfg.EVMBuildDelay, cfg.EVMBuildDelayEmptyBlocks,
        cfg.PriorityFeeRecipient)

    return &SingleNodeApp{
        logger:         logger,
        cfg:            cfg,
        blockBuilder:   bb,
        stateManager:   stateMgr,
        appCtx:         ctx,
        cancel:         cancel,
        runLoopStopped: make(chan struct{}),
    }, nil
}

Health Checks

Essential for Kubernetes/orchestration:

func (app *SingleNodeApp) healthHandler(w http.ResponseWriter, r *http.Request) {
    // Check if context cancelled
    if err := app.appCtx.Err(); err != nil {
        http.Error(w, "unavailable", http.StatusServiceUnavailable)
        return
    }

    // Check if Geth is reachable
    if app.connectionRefused {
        http.Error(w, "ethereum unavailable", http.StatusServiceUnavailable)
        return
    }

    // Check if run loop is alive
    select {
    case <-app.runLoopStopped:
        http.Error(w, "run loop stopped", http.StatusServiceUnavailable)
        return
    default:
    }

    w.WriteHeader(http.StatusOK)
    w.Write([]byte("OK"))
}

Track connection status:

func (app *SingleNodeApp) setConnectionStatus(err error) {
    if err == nil {
        app.connectionRefused = false
        return
    }
    if strings.Contains(err.Error(), "connection refused") {
        app.connectionRefused = true
        app.logger.Warn("Geth connection refused")
    }
}

The Run Loop

The heart of block production:

func (app *SingleNodeApp) Start() {
    // Launch health server
    app.wg.Add(1)
    go func() {
        defer app.wg.Done()
        mux := http.NewServeMux()
        mux.HandleFunc("/health", app.healthHandler)
        mux.Handle("/metrics", promhttp.Handler())

        server := &http.Server{Addr: app.cfg.HealthAddr, Handler: mux}

        go func() {
            <-app.appCtx.Done()
            ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
            defer cancel()
            server.Shutdown(ctx)
        }()

        server.ListenAndServe()
    }()

    // Start block production
    app.wg.Add(1)
    go func() {
        defer app.wg.Done()
        defer close(app.runLoopStopped)
        app.runLoop()
    }()
}

func (app *SingleNodeApp) runLoop() {
    app.stateManager.ResetBlockState(app.appCtx)

    for {
        select {
        case <-app.appCtx.Done():
            return
        default:
            err := app.produceBlock()
            app.setConnectionStatus(err)

            if errors.Is(err, ErrEmptyBlock) {
                time.Sleep(app.cfg.TxPoolPollingInterval)
                continue
            }
            if err != nil {
                app.logger.Error("block production failed", "error", err)
            }

            app.stateManager.ResetBlockState(app.appCtx)
        }
    }
}

func (app *SingleNodeApp) produceBlock() error {
    // Phase 1: Build
    if err := app.blockBuilder.GetPayload(app.appCtx); err != nil {
        return err
    }

    // Get state after build
    state := app.stateManager.GetBlockBuildState(app.appCtx)

    // Phase 2: Finalize
    return app.blockBuilder.FinalizeBlock(
        app.appCtx, state.PayloadID, state.ExecutionPayload, state.Requests)
}

Graceful Shutdown

Handle SIGTERM/SIGINT properly:

func (app *SingleNodeApp) Stop() {
    app.logger.Info("stopping...")
    app.cancel()

    // Wait with timeout
    done := make(chan struct{})
    go func() {
        app.wg.Wait()
        close(done)
    }()

    select {
    case <-done:
        app.logger.Info("shutdown complete")
    case <-time.After(5 * time.Second):
        app.logger.Warn("shutdown timed out")
    }
}

CLI Entry Point

The CLI uses urfave/cli to map flags to the Config struct, sets up signal handling for graceful shutdown, and wires everything together. See the full source — here’s the core:

ctx, cancel := signal.NotifyContext(context.Background(),
    os.Interrupt, syscall.SIGTERM)
defer cancel()

snode, err := NewSingleNodeApp(ctx, cfg, logger)
if err != nil {
    return err
}

snode.Start()
<-ctx.Done()
snode.Stop()

signal.NotifyContext catches SIGTERM/SIGINT and cancels the context, which propagates through the run loop and triggers Stop().

Running It

Start Geth using the same Docker setup from Part 1:

docker compose up geth

Then run the consensus layer:

cd 02-single-node
go run ./cmd \
    --instance-id "node-1" \
    --priority-fee-recipient "0xYourAddress"

The defaults connect to localhost:8551 and use the shared JWT secret from the repo.

You should see output like this:

level=INFO msg="Starting single-node consensus" instanceID=node-1
level=INFO msg="Health endpoint listening" addr=:8080
level=INFO msg="Initialized from Geth" component=BlockBuilder height=0 hash=0x40a4ba...

Once you send a transaction (e.g., with cast send), the node picks it up and finalizes a block:

level=INFO msg="Block finalized" component=BlockBuilder height=1 hash=0xf5c59c... txs=1

If there are no pending transactions, the node waits quietly until new ones arrive.

Metrics

The health server exposes a /metrics endpoint via promhttp.Handler() for Prometheus scraping. This gives you Go runtime metrics and any custom metrics you add. You can see it in the Start() method above:

mux.Handle("/metrics", promhttp.Handler())

Scrape http://localhost:8080/metrics to monitor your consensus layer.

What’s Next

We now have a production-ready single-node consensus. But what happens when this node fails? In Part 3: Distributed Consensus with Redis, PostgreSQL, and Member Nodes, we’ll add:

  • Leader election with Redis
  • Durable payload storage in PostgreSQL
  • An HTTP API for block sync
  • Horizontally scalable member nodes

Full source code: geth-consensus-tutorial | Based on mev-commit consensus layer