Skip to main content

Ingestion Pipeline Sample Code

Complete code for a small sample of a Stellar network ingestion pipeline using the Stellar Go Ingestion SDK to publish derived data to a remote message broker. Demonstrate event-driven, distributed processing with a sample microservice (Python script) as subscriber.

This example uses the ZeroMQ goczmq Go wrapper SDK, which requires a few o/s dependent libraries to also be installed on the host machine.

Put these files in a directory, compile and run with go build -o pipeline ./.; ./pipeline

go.mod​

module example/pipeline

go 1.22

toolchain go1.22.1

require (
github.com/stellar/go v0.0.0-20240614234001-3a31ed780c58
github.com/zeromq/goczmq v4.1.0+incompatible
)

main.go​

package main

import (
"context"
"encoding/json"
"io"
"log"
"os"
"os/signal"

"github.com/pkg/errors"
"github.com/stellar/go/amount"
"github.com/stellar/go/historyarchive"
"github.com/stellar/go/ingest"
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/network"
"github.com/stellar/go/support/datastore"
"github.com/stellar/go/support/storage"
"github.com/stellar/go/xdr"

"github.com/zeromq/goczmq"
)

// Application specifics
type AppPayment struct {
Timestamp uint
BuyerAccountId string
SellerAccountId string
AssetCode string
Amount string
}

// General stream topology
type Message struct {
Payload []byte
}

type Processor interface {
Process(context.Context, Message) error
}

type Publisher interface {
Subscribe(receiver Processor)
}

// Ingestion Pipeline Processors
type ZeroMQOutboundAdapter struct {
Publisher *goczmq.Sock
}

func (adapter *ZeroMQOutboundAdapter) Process(ctx context.Context, msg Message) error {
_, err := adapter.Publisher.Write(msg.Payload)
return err
}

type AppPaymentTransformer struct {
processors []Processor
networkPassPhrase string
}

func (transformer *AppPaymentTransformer) Subscribe(receiver Processor) {
transformer.processors = append(transformer.processors, receiver)
}

func (transformer *AppPaymentTransformer) Process(ctx context.Context, msg Message) error {
ledgerCloseMeta := xdr.LedgerCloseMeta{}
err := ledgerCloseMeta.UnmarshalBinary(msg.Payload)
if err != nil {
return errors.Wrapf(err, "failed to unmarshal message payload to LedgerCloseMeta")
}

ledgerTxReader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(transformer.networkPassPhrase, ledgerCloseMeta)
if err != nil {
return errors.Wrapf(err, "failed to create reader for ledger %v", ledgerCloseMeta.LedgerSequence())
}

closeTime := uint(ledgerCloseMeta.LedgerHeaderHistoryEntry().Header.ScpValue.CloseTime)

// scan all transactions in a ledger for payments to derive new model from
transaction, err := ledgerTxReader.Read()
for ; err == nil; transaction, err = ledgerTxReader.Read() {
for _, op := range transaction.Envelope.Operations() {
switch op.Body.Type {
case xdr.OperationTypePayment:
networkPayment := op.Body.MustPaymentOp()
myPayment := AppPayment{
Timestamp: closeTime,
BuyerAccountId: networkPayment.Destination.Address(),
SellerAccountId: op.SourceAccount.Address(),
AssetCode: networkPayment.Asset.StringCanonical(),
Amount: amount.String(networkPayment.Amount),
}
jsonBytes, err := json.Marshal(myPayment)
if err != nil {
return err
}

for _, processor := range transformer.processors {
processor.Process(ctx, Message{Payload: jsonBytes})
}
}
}
}
if err != io.EOF {
return errors.Wrapf(err, "failed to read transaction from ledger %v", ledgerCloseMeta.LedgerSequence())
}
return nil
}

type CaptiveCoreInboundAdapter struct {
TomlParams ledgerbackend.CaptiveCoreTomlParams
processors []Processor
historyArchiveURLs []string
coreConfigNetworkTemplate []byte
}

func (adapter *CaptiveCoreInboundAdapter) Subscribe(receiver Processor) {
adapter.processors = append(adapter.processors, receiver)
}

func (adapter *CaptiveCoreInboundAdapter) Run(ctx context.Context) error {
// Setup captive core config to use the Pubnet network
captiveCoreToml, err := ledgerbackend.NewCaptiveCoreTomlFromData(adapter.coreConfigNetworkTemplate, adapter.TomlParams)
if err != nil {
return errors.Wrap(err, "error creating captive core toml")
}

captiveConfig := ledgerbackend.CaptiveCoreConfig{
BinaryPath: adapter.TomlParams.CoreBinaryPath,
HistoryArchiveURLs: adapter.TomlParams.HistoryArchiveURLs,
Context: ctx,
Toml: captiveCoreToml,
}

// Create a new captive core backend, the origin of ingestion pipeline
captiveBackend, err := ledgerbackend.NewCaptive(captiveConfig)
if err != nil {
return errors.Wrap(err, "error creating captive core instance")
}

// Create a client to the network's history archives
historyAchive, err := historyarchive.NewArchivePool(adapter.historyArchiveURLs, historyarchive.ArchiveOptions{
ConnectOptions: storage.ConnectOptions{
UserAgent: "my_app",
Context: ctx,
},
})

if err != nil {
return errors.Wrap(err, "error creating history archive client")
}

// Acquire the most recent ledger on network
latestNetworkLedger, err := datastore.GetLatestLedgerSequenceFromHistoryArchives(historyAchive)
if err != nil {
return errors.Wrap(err, "error getting latest ledger")
}

// Tell the captive core instance to emit LedgerCloseMeta starting at
// latest network ledger and continuing indefintely, streaming.
if err := captiveBackend.PrepareRange(ctx, ledgerbackend.UnboundedRange(latestNetworkLedger)); err != nil {
return errors.Wrap(err, "error preparing captive core ledger range")
}

// Run endless loop that receives LedgerCloseMeta from captive core for each new
// ledger generated by the network and pushes it to next processors in pipeline
for nextLedger := latestNetworkLedger; true; nextLedger++ {
ledgerCloseMeta, err := captiveBackend.GetLedger(ctx, nextLedger)
if err != nil {
return errors.Wrapf(err, "failed to retrieve ledger %d from the ledger backend", nextLedger)
}

payload, err := ledgerCloseMeta.MarshalBinary()
if err != nil {
return errors.Wrapf(err, "failed to encode ledger %d from xdr to binary", nextLedger)
}

log.Printf("Processing Ledger %v", nextLedger)
for _, processor := range adapter.processors {
if err := processor.Process(ctx, Message{Payload: payload}); err != nil {
return errors.Wrapf(err, "failed to process ledger %d", nextLedger)
}
}
}
return nil
}

func main() {
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
defer stop()

// create the inbound 'source of origin' adapter,
// imports network data using this captive core config
networkInboundAdapter := &CaptiveCoreInboundAdapter{
historyArchiveURLs: network.TestNetworkhistoryArchiveURLs,
coreConfigNetworkTemplate: ledgerbackend.TestnetDefaultConfig,
TomlParams: ledgerbackend.CaptiveCoreTomlParams{
NetworkPassphrase: network.TestNetworkPassphrase,
HistoryArchiveURLs: network.TestNetworkhistoryArchiveURLs,
UseDB: true,
CoreBinaryPath: "stellar-core", // assumes you have installed stellar-core on your o/s PATH
},
}

// create the app transformer to convert network data to application data model
appTransformer := &AppPaymentTransformer{networkPassPhrase: network.TestNetworkPassphrase}

// create the outbound adapter, this is the end point of the pipeline
// publishes application data model as messages to a broker
publisher, err := goczmq.NewPub("tcp://127.0.0.1:5555")
if err != nil {
log.Printf("error creating 0MQ publisher: %v", err)
return
}
defer publisher.Destroy()
outboundAdapter := &ZeroMQOutboundAdapter{Publisher: publisher}

// wire up the ingestion pipeline and let it run
appTransformer.Subscribe(outboundAdapter)
networkInboundAdapter.Subscribe(appTransformer)
log.Printf("Payment ingestion pipeline ended %v", networkInboundAdapter.Run(ctx))
}

distributed_payment_subsciber.py​

A Python script demonstrating how we now have distributed processing and event driven architecture by leveraging the MQ Broker to push derived application payment data model out to other microservices. Make sure to pip install pyzmq

import sys
import zmq
import json

# Socket to talk to server
context = zmq.Context()
socket = context.socket(zmq.SUB)

print("Collecting next 10 payments from pipeline ...")
socket.connect("tcp://127.0.0.1:5555")
socket.subscribe("")

for request in range(10):

message = socket.recv()
json_object = json.loads(message)
json_formatted_str = json.dumps(json_object, indent=2)
print(f"Received payment:\n\n{json_formatted_str}")