Skip to main content

Go Library API Reference

Full generated documentation is available on pkg.go.dev:

pkg.go.dev/github.com/zourzouvillys/laredo

This page summarizes the key packages and interfaces. For method signatures, types, and detailed godoc, see the link above.

Packages

Core

PackageImport pathDescription
laredo (root)github.com/zourzouvillys/laredoAll public interfaces, types, builder options, and NewEngine()

Sources

PackageImport pathDescription
source/pggithub.com/zourzouvillys/laredo/source/pgPostgreSQL logical replication source (ephemeral and stateful modes, publication management, reconnection)
source/kinesisgithub.com/zourzouvillys/laredo/source/kinesisS3 baseline + Kinesis change stream source

Targets

PackageImport pathDescription
target/memorygithub.com/zourzouvillys/laredo/target/memoryIndexedTarget (raw rows with secondary indexes) and CompiledTarget (domain objects via compiler function)
target/fanoutgithub.com/zourzouvillys/laredo/target/fanoutReplication fan-out target with in-memory state, change journal, snapshots, and embedded gRPC server
target/httpsyncgithub.com/zourzouvillys/laredo/target/httpsyncHTTP sync target (batched POST, retry, durability tracking)

Snapshots

PackageImport pathDescription
snapshot/localgithub.com/zourzouvillys/laredo/snapshot/localLocal disk snapshot store
snapshot/s3github.com/zourzouvillys/laredo/snapshot/s3S3 snapshot store
snapshot/jsonlgithub.com/zourzouvillys/laredo/snapshot/jsonlJSONL snapshot serializer

Pipeline components

PackageImport pathDescription
filtergithub.com/zourzouvillys/laredo/filterBuilt-in pipeline filters: FieldEquals, FieldPrefix, FieldRegex
transformgithub.com/zourzouvillys/laredo/transformBuilt-in pipeline transforms: DropFields, RenameFields, AddTimestamp
deadlettergithub.com/zourzouvillys/laredo/deadletterDead letter store interface and implementations

Services

PackageImport pathDescription
servicegithub.com/zourzouvillys/laredo/servicegRPC server setup (Connect protocol)
service/oamgithub.com/zourzouvillys/laredo/service/oamOAM gRPC service (status, admin, snapshot management, dead letters, replay)
service/querygithub.com/zourzouvillys/laredo/service/queryQuery gRPC service (lookup, list, count, subscribe)
service/replicationgithub.com/zourzouvillys/laredo/service/replicationFan-out replication gRPC service

Clients

PackageImport pathDescription
client/fanoutgithub.com/zourzouvillys/laredo/client/fanoutGo client for the replication fan-out protocol

Configuration

PackageImport pathDescription
configgithub.com/zourzouvillys/laredo/configHOCON config loading and validation

Core interfaces

Engine

Engine is the top-level orchestrator. It manages pipelines, coordinates startup (baseline or resume), streams changes, and handles shutdown.

type Engine interface {
Start(ctx context.Context) error
Stop(ctx context.Context) error
AwaitReady(timeout time.Duration) bool
IsReady() bool
IsSourceReady(sourceID string) bool
OnReady(callback func())
Reload(ctx context.Context, sourceID string, table TableIdentifier) error
Pause(ctx context.Context, sourceID string) error
Resume(ctx context.Context, sourceID string) error
CreateSnapshot(ctx context.Context, userMeta map[string]Value) error
Targets(sourceID string, table TableIdentifier) []SyncTarget
Pipelines() []PipelineInfo
SourceIDs() []string
SourceInfo(sourceID string) (SourceRunInfo, bool)
TableSchema(table TableIdentifier) []ColumnDefinition
ResetSource(ctx context.Context, sourceID string, dropPublication bool) error
}

Create an engine with NewEngine() and a set of Option values. The engine does not start until Start() is called.

SyncSource

SyncSource produces baseline snapshots and change streams from a data source. Sources are shared across pipelines -- one PostgreSQL source can feed multiple tables into different targets.

type SyncSource interface {
Init(ctx context.Context, config SourceConfig) (map[TableIdentifier][]ColumnDefinition, error)
ValidateTables(ctx context.Context, tables []TableIdentifier) []ValidationError
Baseline(ctx context.Context, tables []TableIdentifier, rowCallback func(TableIdentifier, Row)) (Position, error)
Stream(ctx context.Context, from Position, handler ChangeHandler) error
Ack(ctx context.Context, position Position) error
SupportsResume() bool
LastAckedPosition(ctx context.Context) (Position, error)
ComparePositions(a, b Position) int
PositionToString(p Position) string
PositionFromString(s string) (Position, error)
Pause(ctx context.Context) error
Resume(ctx context.Context) error
GetLag() LagInfo
OrderingGuarantee() OrderingGuarantee
State() SourceState
Close(ctx context.Context) error
}

SyncTarget

SyncTarget consumes baseline rows and change events for a single table. Each pipeline has exactly one target instance.

type SyncTarget interface {
OnInit(ctx context.Context, table TableIdentifier, columns []ColumnDefinition) error
OnBaselineRow(ctx context.Context, table TableIdentifier, row Row) error
OnBaselineComplete(ctx context.Context, table TableIdentifier) error
OnInsert(ctx context.Context, table TableIdentifier, columns Row) error
OnUpdate(ctx context.Context, table TableIdentifier, columns Row, identity Row) error
OnDelete(ctx context.Context, table TableIdentifier, identity Row) error
OnTruncate(ctx context.Context, table TableIdentifier) error
IsDurable() bool
OnSchemaChange(ctx context.Context, table TableIdentifier, oldColumns, newColumns []ColumnDefinition) SchemaChangeResponse
ExportSnapshot(ctx context.Context) ([]SnapshotEntry, error)
RestoreSnapshot(ctx context.Context, metadata TableSnapshotInfo, entries []SnapshotEntry) error
SupportsConsistentSnapshot() bool
OnClose(ctx context.Context, table TableIdentifier) error
}

EngineObserver

EngineObserver receives structured lifecycle and operational events from the engine. All methods are called synchronously from the engine goroutine and must not block.

The interface covers: source lifecycle, baseline progress, change streaming, ACK advancement, backpressure, snapshots, schema changes, lag updates, dead letters, TTL expiry, validation, and fan-out replication events.

Laredo provides two built-in implementations:

  • NullObserver -- a no-op implementation for embedded users who do not need observability
  • CompositeObserver -- fans out observer calls to multiple implementations

The metrics/prometheus and metrics/otel packages provide production-ready observer implementations.

Example: creating an engine

package main

import (
"context"
"log"
"time"

"github.com/zourzouvillys/laredo"
"github.com/zourzouvillys/laredo/source/pg"
"github.com/zourzouvillys/laredo/target/memory"
)

func main() {
engine, errs := laredo.NewEngine(
laredo.WithSource("pg_main", pg.New(
pg.Connection("postgresql://user:pass@localhost:5432/mydb"),
pg.SlotMode(pg.Ephemeral),
)),
laredo.WithPipeline("pg_main",
laredo.Table("public", "users"),
memory.NewIndexedTarget(
memory.LookupFields("id"),
),
),
)
if len(errs) > 0 {
log.Fatalf("config errors: %v", errs)
}

ctx := context.Background()
engine.Start(ctx)
defer engine.Stop(ctx)

engine.AwaitReady(30 * time.Second)
}

Example: querying a target

Use GetTarget to retrieve a typed target from a running engine:

target, ok := laredo.GetTarget[*memory.IndexedTarget](
engine, "pg_main", laredo.Table("public", "users"),
)
if !ok {
log.Fatal("target not found")
}

// Lookup by primary key fields
row, found := target.Lookup("user-123")
if found {
log.Printf("name: %s", row.GetString("name"))
}

// List all rows
for _, row := range target.All() {
log.Printf("row: %v", row)
}

// Subscribe to live changes
target.Listen(func(old, new laredo.Row) {
log.Printf("changed: %v -> %v", old, new)
})

Further reading