Skip to main content

Library Usage

Embed Laredo in your Go application for direct programmatic access to in-memory data.

Install the module

go get github.com/zourzouvillys/laredo

Minimal example

package main

import (
"log"
"time"

"github.com/zourzouvillys/laredo"
"github.com/zourzouvillys/laredo/source/pg"
"github.com/zourzouvillys/laredo/target/memory"
)

func main() {
engine, errs := laredo.NewEngine(
// Register a PostgreSQL source
laredo.WithSource("pg_main", pg.New(
pg.Connection("postgresql://user:pass@localhost:5432/mydb"),
pg.SlotMode(pg.Ephemeral),
)),

// Bind a table to an indexed in-memory target
laredo.WithPipeline("pg_main",
laredo.Table("public", "config_document"),
memory.NewIndexedTarget(
memory.LookupFields("instance_id", "key"),
),
),
)
if len(errs) > 0 {
log.Fatalf("config errors: %v", errs)
}

// Start baseline + streaming
engine.Start(context.Background())
defer engine.Stop(context.Background())

// Wait until data is ready (blocking)
engine.AwaitReady(30 * time.Second)

// Or use a callback instead:
// engine.OnReady(func() { log.Println("all pipelines ready") })

// Query the in-memory replica directly
target, ok := laredo.GetTarget[*memory.IndexedTarget](
engine, "pg_main", laredo.Table("public", "config_document"),
)
if !ok {
log.Fatal("target not found")
}

row, ok := target.Lookup("inst_abc", "settings/default")
if ok {
log.Printf("found: %v", row)
}

// Subscribe to live changes
target.Listen(func(old, new laredo.Row) {
log.Printf("change: %v -> %v", old, new)
})

// Block forever (or until signal)
select {}
}

Adding filters and transforms

laredo.WithPipeline("pg_main",
laredo.Table("public", "config_document"),
memory.NewIndexedTarget(
memory.LookupFields("instance_id", "key"),
memory.AddIndex(laredo.IndexDefinition{Name: "by_instance", Fields: []string{"instance_id"}}),
),
// Only sync active rows
laredo.PipelineFilterOpt(laredo.PipelineFilterFunc(
func(table laredo.TableIdentifier, row laredo.Row) bool {
return row.GetString("status") == "active"
},
)),
// Strip sensitive fields before storing
laredo.PipelineTransformOpt(laredo.PipelineTransformFunc(
func(table laredo.TableIdentifier, row laredo.Row) laredo.Row {
return row.Without("ssn", "internal_notes")
},
)),
laredo.BufferSize(1000),
laredo.ErrorPolicyOpt(laredo.ErrorIsolate),
),

Working with rows

Row is a map[string]Value with convenience accessors:

row, ok := target.Lookup("inst_abc", "settings/default")
if !ok {
return
}

// Get a value with presence check (distinguishes NULL from absent)
val, exists := row.Get("config_json")

// Get a string value (returns "" if missing or not a string)
name := row.GetString("name")

// Iterate columns in deterministic (sorted) order
for key := range row.Keys() {
fmt.Printf("%s = %v\n", key, row[key])
}

// Create a copy without sensitive fields
safe := row.Without("ssn", "internal_notes")

Attaching gRPC management

engine.Start(ctx)

grpcServer := service.New(engine,
service.Port(4001),
service.EnableOAM(true),
service.EnableQuery(true),
)
grpcServer.Start()

Next steps