Ingestion Pipeline Sample Code

Complete code for a small sample of a Stellar network ingestion pipeline using the Stellar Go Ingestion SDK to publish derived data to a remote message broker. Demonstrate event-driven, distributed processing with a sample microservice (Python script) as subscriber.

This example uses the ZeroMQ goczmq Go wrapper SDK, which requires a few o/s dependent libraries to also be installed on the host machine.

Put these files in a directory, compile and run with go build -o pipeline ./.; ./pipeline


module example/pipeline

go 1.22

toolchain go1.22.1

require ( v0.0.0-20240614234001-3a31ed780c58 v4.1.0+incompatible


package main

import (



// Application specifics
type AppPayment struct {
Timestamp uint
BuyerAccountId string
SellerAccountId string
AssetCode string
Amount string

// General stream topology
type Message struct {
Payload []byte

type Processor interface {
Process(context.Context, Message) error

type Publisher interface {
Subscribe(receiver Processor)

// Ingestion Pipeline Processors
type ZeroMQOutboundAdapter struct {
Publisher *goczmq.Sock

func (adapter *ZeroMQOutboundAdapter) Process(ctx context.Context, msg Message) error {
_, err := adapter.Publisher.Write(msg.Payload)
return err

type AppPaymentTransformer struct {
processors []Processor
networkPassPhrase string

func (transformer *AppPaymentTransformer) Subscribe(receiver Processor) {
transformer.processors = append(transformer.processors, receiver)

func (transformer *AppPaymentTransformer) Process(ctx context.Context, msg Message) error {
ledgerCloseMeta := xdr.LedgerCloseMeta{}
err := ledgerCloseMeta.UnmarshalBinary(msg.Payload)
if err != nil {
return errors.Wrapf(err, "failed to unmarshal message payload to LedgerCloseMeta")

ledgerTxReader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(transformer.networkPassPhrase, ledgerCloseMeta)
if err != nil {
return errors.Wrapf(err, "failed to create reader for ledger %v", ledgerCloseMeta.LedgerSequence())

closeTime := uint(ledgerCloseMeta.LedgerHeaderHistoryEntry().Header.ScpValue.CloseTime)

// scan all transactions in a ledger for payments to derive new model from
transaction, err := ledgerTxReader.Read()
for ; err == nil; transaction, err = ledgerTxReader.Read() {
for _, op := range transaction.Envelope.Operations() {
switch op.Body.Type {
case xdr.OperationTypePayment:
networkPayment := op.Body.MustPaymentOp()
myPayment := AppPayment{
Timestamp: closeTime,
BuyerAccountId: networkPayment.Destination.Address(),
SellerAccountId: op.SourceAccount.Address(),
AssetCode: networkPayment.Asset.StringCanonical(),
Amount: amount.String(networkPayment.Amount),
jsonBytes, err := json.Marshal(myPayment)
if err != nil {
return err

for _, processor := range transformer.processors {
processor.Process(ctx, Message{Payload: jsonBytes})
if err != io.EOF {
return errors.Wrapf(err, "failed to read transaction from ledger %v", ledgerCloseMeta.LedgerSequence())
return nil

type CaptiveCoreInboundAdapter struct {
TomlParams ledgerbackend.CaptiveCoreTomlParams
processors []Processor
historyArchiveURLs []string
coreConfigNetworkTemplate []byte

func (adapter *CaptiveCoreInboundAdapter) Subscribe(receiver Processor) {
adapter.processors = append(adapter.processors, receiver)

func (adapter *CaptiveCoreInboundAdapter) Run(ctx context.Context) error {
// Setup captive core config to use the Pubnet network
captiveCoreToml, err := ledgerbackend.NewCaptiveCoreTomlFromData(adapter.coreConfigNetworkTemplate, adapter.TomlParams)
if err != nil {
return errors.Wrap(err, "error creating captive core toml")

captiveConfig := ledgerbackend.CaptiveCoreConfig{
BinaryPath: adapter.TomlParams.CoreBinaryPath,
HistoryArchiveURLs: adapter.TomlParams.HistoryArchiveURLs,
Context: ctx,
Toml: captiveCoreToml,

// Create a new captive core backend, the origin of ingestion pipeline
captiveBackend, err := ledgerbackend.NewCaptive(captiveConfig)
if err != nil {
return errors.Wrap(err, "error creating captive core instance")

// Create a client to the network's history archives
historyAchive, err := historyarchive.NewArchivePool(adapter.historyArchiveURLs, historyarchive.ArchiveOptions{
ConnectOptions: storage.ConnectOptions{
UserAgent: "my_app",
Context: ctx,

if err != nil {
return errors.Wrap(err, "error creating history archive client")

// Acquire the most recent ledger on network
latestNetworkLedger, err := datastore.GetLatestLedgerSequenceFromHistoryArchives(historyAchive)
if err != nil {
return errors.Wrap(err, "error getting latest ledger")

// Tell the captive core instance to emit LedgerCloseMeta starting at
// latest network ledger and continuing indefintely, streaming.
if err := captiveBackend.PrepareRange(ctx, ledgerbackend.UnboundedRange(latestNetworkLedger)); err != nil {
return errors.Wrap(err, "error preparing captive core ledger range")

// Run endless loop that receives LedgerCloseMeta from captive core for each new
// ledger generated by the network and pushes it to next processors in pipeline
for nextLedger := latestNetworkLedger; true; nextLedger++ {
ledgerCloseMeta, err := captiveBackend.GetLedger(ctx, nextLedger)
if err != nil {
return errors.Wrapf(err, "failed to retrieve ledger %d from the ledger backend", nextLedger)

payload, err := ledgerCloseMeta.MarshalBinary()
if err != nil {
return errors.Wrapf(err, "failed to encode ledger %d from xdr to binary", nextLedger)

log.Printf("Processing Ledger %v", nextLedger)
for _, processor := range adapter.processors {
if err := processor.Process(ctx, Message{Payload: payload}); err != nil {
return errors.Wrapf(err, "failed to process ledger %d", nextLedger)
return nil

func main() {
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
defer stop()

// create the inbound 'source of origin' adapter,
// imports network data using this captive core config
networkInboundAdapter := &CaptiveCoreInboundAdapter{
historyArchiveURLs: network.TestNetworkhistoryArchiveURLs,
coreConfigNetworkTemplate: ledgerbackend.TestnetDefaultConfig,
TomlParams: ledgerbackend.CaptiveCoreTomlParams{
NetworkPassphrase: network.TestNetworkPassphrase,
HistoryArchiveURLs: network.TestNetworkhistoryArchiveURLs,
UseDB: true,
CoreBinaryPath: "stellar-core", // assumes you have installed stellar-core on your o/s PATH

// create the app transformer to convert network data to application data model
appTransformer := &AppPaymentTransformer{networkPassPhrase: network.TestNetworkPassphrase}

// create the outbound adapter, this is the end point of the pipeline
// publishes application data model as messages to a broker
publisher, err := goczmq.NewPub("tcp://")
if err != nil {
log.Printf("error creating 0MQ publisher: %v", err)
defer publisher.Destroy()
outboundAdapter := &ZeroMQOutboundAdapter{Publisher: publisher}

// wire up the ingestion pipeline and let it run
log.Printf("Payment ingestion pipeline ended %v", networkInboundAdapter.Run(ctx))

A Python script demonstrating how we now have distributed processing and event driven architecture by leveraging the MQ Broker to push derived application payment data model out to other microservices. Make sure to pip install pyzmq

import sys
import zmq
import json

# Socket to talk to server
context = zmq.Context()
socket = context.socket(zmq.SUB)

print("Collecting next 10 payments from pipeline ...")

for request in range(10):

message = socket.recv()
json_object = json.loads(message)
json_formatted_str = json.dumps(json_object, indent=2)
print(f"Received payment:\n\n{json_formatted_str}")