From 6e939bb32f958a5d73e63ac54957aacbc7c81951 Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Mon, 17 Jun 2024 00:53:30 -0700 Subject: [PATCH 1/7] docpocalypse: building app for ingest sdk --- docs/building-apps/ingest-sdk/_category_.json | 7 + .../ingest-sdk/ingestion-pipeline-code.mdx | 290 ++++++++++++++++++ docs/building-apps/ingest-sdk/overview.mdx | 62 ++++ 3 files changed, 359 insertions(+) create mode 100644 docs/building-apps/ingest-sdk/_category_.json create mode 100644 docs/building-apps/ingest-sdk/ingestion-pipeline-code.mdx create mode 100644 docs/building-apps/ingest-sdk/overview.mdx diff --git a/docs/building-apps/ingest-sdk/_category_.json b/docs/building-apps/ingest-sdk/_category_.json new file mode 100644 index 000000000..c9e5196bb --- /dev/null +++ b/docs/building-apps/ingest-sdk/_category_.json @@ -0,0 +1,7 @@ +{ + "position": 55, + "label": "Build custom network ingestion pipeline", + "link": { + "type": "generated-index" + } +} diff --git a/docs/building-apps/ingest-sdk/ingestion-pipeline-code.mdx b/docs/building-apps/ingest-sdk/ingestion-pipeline-code.mdx new file mode 100644 index 000000000..24e9e1d6b --- /dev/null +++ b/docs/building-apps/ingest-sdk/ingestion-pipeline-code.mdx @@ -0,0 +1,290 @@ +--- +title: Ingestion Pipeline Sample Code +sidebar_position: 30 +--- + +Complete code for a small sample of a Stellar network ingestion pipeline using the Stellar Go [Ingestion Sdk](overview.mdx#the-ingestion-sdk-packages) to publish derived data to a remote message broker. + +This example uses ZeroMQ [goczmq](https://github.com/zeromq/goczmq) go wrapper SDK, which requires a few o/s [dependent libraries be installed on host machine](https://github.com/zeromq/goczmq?tab=readme-ov-file#dependencies) also. + +Put these files in a directory, compile and run with `go build -o pipeline ./.; ./pipeline` + +### `go.mod` + + + +``` +module github.com/stellar/example-ingestion-pipeline + +go 1.22 + +toolchain go1.22.1 + +require ( + github.com/stellar/go v0.0.0-20240614234001-3a31ed780c58 + github.com/zeromq/goczmq v4.1.0+incompatible +) +``` + + + +### `main.go` + + + +```go +package main + +import ( + "context" + "encoding/json" + "io" + "log" + "os" + "os/signal" + + "github.com/pkg/errors" + "github.com/stellar/go/amount" + "github.com/stellar/go/historyarchive" + "github.com/stellar/go/ingest" + "github.com/stellar/go/ingest/ledgerbackend" + "github.com/stellar/go/network" + "github.com/stellar/go/support/datastore" + "github.com/stellar/go/support/storage" + "github.com/stellar/go/xdr" + + "github.com/zeromq/goczmq" +) + +// Application specifics +type AppPayment struct { + Timestamp uint + BuyerAccountId string + SellerAccountId string + AssetCode string + Amount string +} + +// General stream topolgy +type Message struct { + Payload []byte +} + +type Processor interface { + Process(context.Context, Message) error +} + +type Publisher interface { + Subscribe(receiver Processor) +} + +// Ingestion Pipeline Processors +type ZeroMQOutboundAdapter struct { + Publisher *goczmq.Sock +} + +func (adapter *ZeroMQOutboundAdapter) Process(ctx context.Context, msg Message) error { + _, err := adapter.Publisher.Write(msg.Payload) + return err +} + +type AppPaymentTransformer struct { + processors []Processor + networkPassPhrase string +} + +func (transformer *AppPaymentTransformer) Subscribe(receiver Processor) { + transformer.processors = append(transformer.processors, receiver) +} + +func (transformer *AppPaymentTransformer) Process(ctx context.Context, msg Message) error { + ledgerCloseMeta := xdr.LedgerCloseMeta{} + err := ledgerCloseMeta.UnmarshalBinary(msg.Payload) + if err != nil { + return errors.Wrapf(err, "failed to unmarshal message payload to LedgerCloseMeta") + } + + ledgerTxReader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(transformer.networkPassPhrase, ledgerCloseMeta) + if err != nil { + return errors.Wrapf(err, "failed to create reader for ledger %v", ledgerCloseMeta.LedgerSequence()) + } + + closeTime := uint(ledgerCloseMeta.LedgerHeaderHistoryEntry().Header.ScpValue.CloseTime) + + // scan all transactions in a ledger for payments to derive new model from + transaction, err := ledgerTxReader.Read() + for ; err == nil; transaction, err = ledgerTxReader.Read() { + for _, op := range transaction.Envelope.Operations() { + switch op.Body.Type { + case xdr.OperationTypePayment: + networkPayment := op.Body.MustPaymentOp() + myPayment := AppPayment{ + Timestamp: closeTime, + BuyerAccountId: networkPayment.Destination.Address(), + SellerAccountId: op.SourceAccount.Address(), + AssetCode: networkPayment.Asset.StringCanonical(), + Amount: amount.String(networkPayment.Amount), + } + jsonBytes, err := json.Marshal(myPayment) + if err != nil { + return err + } + + for _, processor := range transformer.processors { + processor.Process(ctx, Message{Payload: jsonBytes}) + } + } + } + } + if err != io.EOF { + return errors.Wrapf(err, "failed to read transaction from ledger %v", ledgerCloseMeta.LedgerSequence()) + } + return nil +} + +type CaptiveCoreInboundAdapter struct { + TomlParams ledgerbackend.CaptiveCoreTomlParams + processors []Processor + historyArchiveURLs []string + coreConfigNetworkTemplate []byte +} + +func (adapter *CaptiveCoreInboundAdapter) Subscribe(receiver Processor) { + adapter.processors = append(adapter.processors, receiver) +} + +func (adapter *CaptiveCoreInboundAdapter) Run(ctx context.Context) error { + // Setup captive core config to use the Pubnet network + captiveCoreToml, err := ledgerbackend.NewCaptiveCoreTomlFromData(adapter.coreConfigNetworkTemplate, adapter.TomlParams) + if err != nil { + return errors.Wrap(err, "error creating captive core toml") + } + + captiveConfig := ledgerbackend.CaptiveCoreConfig{ + BinaryPath: adapter.TomlParams.CoreBinaryPath, + HistoryArchiveURLs: adapter.TomlParams.HistoryArchiveURLs, + Context: ctx, + Toml: captiveCoreToml, + } + + // Create a new captive core backend, the origin of ingestion pipeline + captiveBackend, err := ledgerbackend.NewCaptive(captiveConfig) + if err != nil { + return errors.Wrap(err, "error creating captive core instance") + } + + // Create a client to the network's history archives + historyAchive, err := historyarchive.NewArchivePool(adapter.historyArchiveURLs, historyarchive.ArchiveOptions{ + ConnectOptions: storage.ConnectOptions{ + UserAgent: "my_app", + Context: ctx, + }, + }) + + if err != nil { + return errors.Wrap(err, "error creating history archive client") + } + + // Acquire the most recent ledger on network + latestNetworkLedger, err := datastore.GetLatestLedgerSequenceFromHistoryArchives(historyAchive) + if err != nil { + return errors.Wrap(err, "error getting latest ledger") + } + + // Tell the captive core instance to emit LedgerCloseMeta starting at + // latest network ledger and continuing indefintely, streaming. + if err := captiveBackend.PrepareRange(ctx, ledgerbackend.UnboundedRange(latestNetworkLedger)); err != nil { + return errors.Wrap(err, "error preparing captive core ledger range") + } + + // Run endless loop that receives LedgerCloseMeta from captive core for each new + // ledger generated by the network and pushes it to next processors in pipeline + for nextLedger := latestNetworkLedger; true; nextLedger++ { + ledgerCloseMeta, err := captiveBackend.GetLedger(ctx, nextLedger) + if err != nil { + return errors.Wrapf(err, "failed to retrieve ledger %d from the ledger backend", nextLedger) + } + + payload, err := ledgerCloseMeta.MarshalBinary() + if err != nil { + return errors.Wrapf(err, "failed to encode ledger %d from xdr to binary", nextLedger) + } + + log.Printf("Processing Ledger %v", nextLedger) + for _, processor := range adapter.processors { + if err := processor.Process(ctx, Message{Payload: payload}); err != nil { + return errors.Wrapf(err, "failed to process ledger %d", nextLedger) + } + } + } + return nil +} + +func main() { + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill) + defer stop() + + // create the inbound 'source of origin' adapter, + // imports network data using this captive core config + networkInboundAdapter := &CaptiveCoreInboundAdapter{ + historyArchiveURLs: network.TestNetworkhistoryArchiveURLs, + coreConfigNetworkTemplate: ledgerbackend.TestnetDefaultConfig, + TomlParams: ledgerbackend.CaptiveCoreTomlParams{ + NetworkPassphrase: network.TestNetworkPassphrase, + HistoryArchiveURLs: network.TestNetworkhistoryArchiveURLs, + UseDB: true, + CoreBinaryPath: "stellar-core", // assumes you have installed stellar-core on your o/s PATH + }, + } + + // create the app transformer to convert network data to application data model + appTransformer := &AppPaymentTransformer{networkPassPhrase: network.TestNetworkPassphrase} + + // create the outbound adapter, this is the end point of the pipeline + // publishes application data model as messages to a broker + publisher, err := goczmq.NewPub("tcp://127.0.0.1:5555") + if err != nil { + log.Printf("error creating 0MQ publisher: %v", err) + return + } + defer publisher.Destroy() + outboundAdapter := &ZeroMQOutboundAdapter{Publisher: publisher} + + // wire up the ingestion pipeline and let it run + appTransformer.Subscribe(outboundAdapter) + networkInboundAdapter.Subscribe(appTransformer) + log.Printf("Payment ingestion pipeline ended %v", networkInboundAdapter.Run(ctx)) +} +``` + + + +### `distributed_payment_subsciber.py` + +A python script demonstrating how we now have distributed processing and event driven architecture by leveraging the MQ Broker to push derived application payment data model out to other microservices. make sure to `pip install pyzmq` + + + +```python +import sys +import zmq +import json + +# Socket to talk to server +context = zmq.Context() +socket = context.socket(zmq.SUB) + +print("Collecting next 10 payents from pipeline ...") +socket.connect("tcp://127.0.0.1:5555") +socket.subscribe("") + +for request in range(10): + + message = socket.recv() + json_object = json.loads(message) + json_formatted_str = json.dumps(json_object, indent=2) + print(f"Received payment:\n\n{json_formatted_str}") + +``` + + diff --git a/docs/building-apps/ingest-sdk/overview.mdx b/docs/building-apps/ingest-sdk/overview.mdx new file mode 100644 index 000000000..a17660db9 --- /dev/null +++ b/docs/building-apps/ingest-sdk/overview.mdx @@ -0,0 +1,62 @@ +--- +title: Overview +sidebar_position: 10 +--- + +This tutorial walks through how an application can leverage common streaming data patterns to ingest Stellar network transaction data using a few select packages from the Stellar Go Repo [github.com/stellar/go](https://github.com/stellar/go/blob/master/) collectively known as the 'Ingestion' SDK: + +## The Ingestion SDK packages + +- `github.com/stellar/go/amount` utility package to convert prices from network transaction operations to string +- `github.com/stellar/go/historyarchive` `github.com/stellar/go/support/datastore` `github.com/stellar/go/support/storage` utility package with convenient wrappers for accessing history archives, and avoid low-level http aspects. +- `github.com/stellar/go/ingest` provides parsing functionality over the network ledger data, converts to more developer centric `LedgerTransaction` model +- `github.com/stellar/go/ingest/ledgerbackend` provides the captive core backend implementation +- `github.com/stellar/go/network` provides convenient pre-configured settings for testnet and pubnet networks. +- `github.com/stellar/go/xdr` a complete golang binding to the stellar network data model. + +## Ingestion project setup + +### Project requirements + +To build an example streaming network ingestion pipeline from live Stellar network transaction data, you'll need: + +- A developer workstation with [Go](https://go.dev/learn/) programming language runtime installed. +- An IDE to edit Go code, [VSCode](https://code.visualstudio.com/download) is good if one needed. +- A newly initialized, empty Go project folder. `mkdir pipeline; cd pipeline; go mod init example/pipeline` +- `stellar-core` must be [installed](https://developers.stellar.org/network/core-node/admin-guide/installation) on your workstation and available on your o/s PATH. + +The [Stellar network data model](https://developers.stellar.org/docs/learn/fundamentals/stellar-data-structures) is defined in an IDL format expressed in[XDR encoding](https://github.com/stellar/stellar-xdr). Our example application is only interested in a small subset of the overall transaction data model related to buying and selling of assets, i.e. a payment. and defines it's own data model internally: + + + +``` +::AppPayment + Timestamp: uint + BuyerAccountId: string + SellerAccountId: string + AssetCode: string + Amount: string +} +``` + + + +The example application will run a [network ingestion pipeline](https://github.com/stellar/go/blob/master/ingest/doc.go) to derive a smaller `ApplicationPayment` model from the [Stellar network transaction data model](https://developers.stellar.org/docs/learn/fundamentals/stellar-data-structures) as 'source of origin' and thus enable the application to avoid large compute resources that would have been required for maintaing storage of the full stellar network data model. + +The ingestion pipeline will perform three distinct stream processor roles: + +### Inbound Adapter + +Acts as the 'source of origin' for the pipeline. Retrieves [LedgerCloseMeta](https://github.com/stellar/go/blob/f30d11432e81c7a7cbb739a694520f729bbb31dd/xdr/xdr_generated.go#L18358) generated from a Stellar network using captive core. `LedgerCloseMeta` is the top level aggregate in the Stellar data model of which all [Stellar network transaction data](https://developers.stellar.org/docs/learn/fundamentals/stellar-data-structures) is nested within. Publishes the `LedgerCloseMeta` onto the pipeline. + +### Transformer + +Subscribes to receive `LedgerCloseMeta` from the pipeline. Uses the Go SDK package [github.com/stellar/go/xdr](https://github.com/stellar/go/tree/master/xdr) to parse the nested network data model for payment operations and convert those into a new instance of application data model `ApplicationPayment` instances. Publishes `ApplicationPayment` to the pipeline. + +### Outbound Adapter + +Acts as the termination of the pipeline, it subscribes to receive `ApplicationPayment` and publishes the data off the pipeline and to an external data store, a ZeroMQ Publisher Socket, which is essentially a message broker. + +### Summary + +Refer to [Ingestion Pipeline Sample Application](ingestion-pipeline-code.mdx), for complete code demonstrating usage of the 'ingestion' sdk packages to create these adapters and transfomers and run a live pipeline against Stellar network. From 9aa9f226b107bcd0ba47d6e3c5b3a0f53f8d3eb4 Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Mon, 17 Jun 2024 01:07:35 -0700 Subject: [PATCH 2/7] docpocalypse: add more comments on sample code --- docs/building-apps/ingest-sdk/ingestion-pipeline-code.mdx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/building-apps/ingest-sdk/ingestion-pipeline-code.mdx b/docs/building-apps/ingest-sdk/ingestion-pipeline-code.mdx index 24e9e1d6b..c25a8de7b 100644 --- a/docs/building-apps/ingest-sdk/ingestion-pipeline-code.mdx +++ b/docs/building-apps/ingest-sdk/ingestion-pipeline-code.mdx @@ -3,7 +3,7 @@ title: Ingestion Pipeline Sample Code sidebar_position: 30 --- -Complete code for a small sample of a Stellar network ingestion pipeline using the Stellar Go [Ingestion Sdk](overview.mdx#the-ingestion-sdk-packages) to publish derived data to a remote message broker. +Complete code for a small sample of a Stellar network ingestion pipeline using the Stellar Go [Ingestion Sdk](overview.mdx#the-ingestion-sdk-packages) to publish derived data to a remote message broker. Demonstrate event driven, distributed processing with a sample microservice(python script) as subscriber. This example uses ZeroMQ [goczmq](https://github.com/zeromq/goczmq) go wrapper SDK, which requires a few o/s [dependent libraries be installed on host machine](https://github.com/zeromq/goczmq?tab=readme-ov-file#dependencies) also. From 25dba75a7ddb4190eaba44a6916a471b1b08cbce Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Fri, 21 Jun 2024 16:07:38 -0700 Subject: [PATCH 3/7] fixed example module name, per review feedgback --- docs/building-apps/ingest-sdk/ingestion-pipeline-code.mdx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/building-apps/ingest-sdk/ingestion-pipeline-code.mdx b/docs/building-apps/ingest-sdk/ingestion-pipeline-code.mdx index c25a8de7b..21ac8e85d 100644 --- a/docs/building-apps/ingest-sdk/ingestion-pipeline-code.mdx +++ b/docs/building-apps/ingest-sdk/ingestion-pipeline-code.mdx @@ -14,7 +14,7 @@ Put these files in a directory, compile and run with `go build -o pipeline ./.; ``` -module github.com/stellar/example-ingestion-pipeline +module example/pipeline go 1.22 From b2c39ed257e03bba150b8175f2207544dec36eaf Mon Sep 17 00:00:00 2001 From: shawn Date: Fri, 21 Jun 2024 16:11:50 -0700 Subject: [PATCH 4/7] fixed typo, review feedback Co-authored-by: urvisavla --- docs/building-apps/ingest-sdk/ingestion-pipeline-code.mdx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/building-apps/ingest-sdk/ingestion-pipeline-code.mdx b/docs/building-apps/ingest-sdk/ingestion-pipeline-code.mdx index 21ac8e85d..197592108 100644 --- a/docs/building-apps/ingest-sdk/ingestion-pipeline-code.mdx +++ b/docs/building-apps/ingest-sdk/ingestion-pipeline-code.mdx @@ -274,7 +274,7 @@ import json context = zmq.Context() socket = context.socket(zmq.SUB) -print("Collecting next 10 payents from pipeline ...") +print("Collecting next 10 payments from pipeline ...") socket.connect("tcp://127.0.0.1:5555") socket.subscribe("") From 0053929fc6e4ce51583a005e539bc2e63f9f2538 Mon Sep 17 00:00:00 2001 From: shawn Date: Fri, 21 Jun 2024 16:12:28 -0700 Subject: [PATCH 5/7] fixed punctuation, review feedback Co-authored-by: urvisavla --- docs/building-apps/ingest-sdk/overview.mdx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/building-apps/ingest-sdk/overview.mdx b/docs/building-apps/ingest-sdk/overview.mdx index a17660db9..6d6dc8ae4 100644 --- a/docs/building-apps/ingest-sdk/overview.mdx +++ b/docs/building-apps/ingest-sdk/overview.mdx @@ -25,7 +25,7 @@ To build an example streaming network ingestion pipeline from live Stellar netwo - A newly initialized, empty Go project folder. `mkdir pipeline; cd pipeline; go mod init example/pipeline` - `stellar-core` must be [installed](https://developers.stellar.org/network/core-node/admin-guide/installation) on your workstation and available on your o/s PATH. -The [Stellar network data model](https://developers.stellar.org/docs/learn/fundamentals/stellar-data-structures) is defined in an IDL format expressed in[XDR encoding](https://github.com/stellar/stellar-xdr). Our example application is only interested in a small subset of the overall transaction data model related to buying and selling of assets, i.e. a payment. and defines it's own data model internally: +The [Stellar network data model](https://developers.stellar.org/docs/learn/fundamentals/stellar-data-structures) is defined in an IDL format expressed in [XDR encoding](https://github.com/stellar/stellar-xdr). Our example application is only interested in a small subset of the overall transaction data model related to buying and selling of assets, i.e. a payment, and defines it's own data model internally: From 6e750bcc56c1d6ed9d33d494ed6541cceaa1497e Mon Sep 17 00:00:00 2001 From: Bri <92327786+briwylde08@users.noreply.github.com> Date: Mon, 24 Jun 2024 11:46:06 -0600 Subject: [PATCH 6/7] Overview, spelling/grammar nits --- docs/building-apps/ingest-sdk/_category_.json | 2 +- docs/building-apps/ingest-sdk/overview.mdx | 22 +++++++++---------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/docs/building-apps/ingest-sdk/_category_.json b/docs/building-apps/ingest-sdk/_category_.json index c9e5196bb..401395737 100644 --- a/docs/building-apps/ingest-sdk/_category_.json +++ b/docs/building-apps/ingest-sdk/_category_.json @@ -1,6 +1,6 @@ { "position": 55, - "label": "Build custom network ingestion pipeline", + "label": "Build Custom Network Ingestion Pipeline", "link": { "type": "generated-index" } diff --git a/docs/building-apps/ingest-sdk/overview.mdx b/docs/building-apps/ingest-sdk/overview.mdx index 6d6dc8ae4..4002c646f 100644 --- a/docs/building-apps/ingest-sdk/overview.mdx +++ b/docs/building-apps/ingest-sdk/overview.mdx @@ -8,11 +8,11 @@ This tutorial walks through how an application can leverage common streaming dat ## The Ingestion SDK packages - `github.com/stellar/go/amount` utility package to convert prices from network transaction operations to string -- `github.com/stellar/go/historyarchive` `github.com/stellar/go/support/datastore` `github.com/stellar/go/support/storage` utility package with convenient wrappers for accessing history archives, and avoid low-level http aspects. -- `github.com/stellar/go/ingest` provides parsing functionality over the network ledger data, converts to more developer centric `LedgerTransaction` model +- `github.com/stellar/go/historyarchive` `github.com/stellar/go/support/datastore` `github.com/stellar/go/support/storage` utility package with convenient wrappers for accessing history archives, and avoid low-level http aspects +- `github.com/stellar/go/ingest` provides parsing functionality over the network ledger data, converts to more developer-centric `LedgerTransaction` model - `github.com/stellar/go/ingest/ledgerbackend` provides the captive core backend implementation -- `github.com/stellar/go/network` provides convenient pre-configured settings for testnet and pubnet networks. -- `github.com/stellar/go/xdr` a complete golang binding to the stellar network data model. +- `github.com/stellar/go/network` provides convenient pre-configured settings for Testnet and Mainnet networks +- `github.com/stellar/go/xdr` a complete Golang binding to the Stellar network data model ## Ingestion project setup @@ -20,12 +20,12 @@ This tutorial walks through how an application can leverage common streaming dat To build an example streaming network ingestion pipeline from live Stellar network transaction data, you'll need: -- A developer workstation with [Go](https://go.dev/learn/) programming language runtime installed. -- An IDE to edit Go code, [VSCode](https://code.visualstudio.com/download) is good if one needed. +- A developer workstation with [Go](https://go.dev/learn/) programming language runtime installed +- An IDE to edit Go code, [VSCode](https://code.visualstudio.com/download) is good if one is needed - A newly initialized, empty Go project folder. `mkdir pipeline; cd pipeline; go mod init example/pipeline` -- `stellar-core` must be [installed](https://developers.stellar.org/network/core-node/admin-guide/installation) on your workstation and available on your o/s PATH. +- `stellar-core` must be [installed](https://developers.stellar.org/network/core-node/admin-guide/installation) on your workstation and available on your o/s PATH -The [Stellar network data model](https://developers.stellar.org/docs/learn/fundamentals/stellar-data-structures) is defined in an IDL format expressed in [XDR encoding](https://github.com/stellar/stellar-xdr). Our example application is only interested in a small subset of the overall transaction data model related to buying and selling of assets, i.e. a payment, and defines it's own data model internally: +The [Stellar network data model](https://developers.stellar.org/docs/learn/fundamentals/stellar-data-structures) is defined in an IDL format expressed in [XDR encoding](https://github.com/stellar/stellar-xdr). Our example application is only interested in a small subset of the overall transaction data model related to buying and selling of assets, i.e. a payment, and defines its own data model internally: @@ -41,13 +41,13 @@ The [Stellar network data model](https://developers.stellar.org/docs/learn/funda -The example application will run a [network ingestion pipeline](https://github.com/stellar/go/blob/master/ingest/doc.go) to derive a smaller `ApplicationPayment` model from the [Stellar network transaction data model](https://developers.stellar.org/docs/learn/fundamentals/stellar-data-structures) as 'source of origin' and thus enable the application to avoid large compute resources that would have been required for maintaing storage of the full stellar network data model. +The example application will run a [network ingestion pipeline](https://github.com/stellar/go/blob/master/ingest/doc.go) to derive a smaller `ApplicationPayment` model from the [Stellar network transaction data model](https://developers.stellar.org/docs/learn/fundamentals/stellar-data-structures) as 'source of origin' and thus enable the application to avoid large compute resources that would have been required for maintaining storage of the full Stellar network data model. The ingestion pipeline will perform three distinct stream processor roles: ### Inbound Adapter -Acts as the 'source of origin' for the pipeline. Retrieves [LedgerCloseMeta](https://github.com/stellar/go/blob/f30d11432e81c7a7cbb739a694520f729bbb31dd/xdr/xdr_generated.go#L18358) generated from a Stellar network using captive core. `LedgerCloseMeta` is the top level aggregate in the Stellar data model of which all [Stellar network transaction data](https://developers.stellar.org/docs/learn/fundamentals/stellar-data-structures) is nested within. Publishes the `LedgerCloseMeta` onto the pipeline. +Acts as the 'source of origin' for the pipeline. Retrieves [LedgerCloseMeta](https://github.com/stellar/go/blob/f30d11432e81c7a7cbb739a694520f729bbb31dd/xdr/xdr_generated.go#L18358) generated from a Stellar network using captive core. `LedgerCloseMeta` is the top-level aggregate in the Stellar data model of which all [Stellar network transaction data](https://developers.stellar.org/docs/learn/fundamentals/stellar-data-structures) is nested within. Publishes the `LedgerCloseMeta` onto the pipeline. ### Transformer @@ -59,4 +59,4 @@ Acts as the termination of the pipeline, it subscribes to receive `ApplicationPa ### Summary -Refer to [Ingestion Pipeline Sample Application](ingestion-pipeline-code.mdx), for complete code demonstrating usage of the 'ingestion' sdk packages to create these adapters and transfomers and run a live pipeline against Stellar network. +Refer to [Ingestion Pipeline Sample Application](ingestion-pipeline-code.mdx) for complete code demonstrating usage of the 'ingestion' SDK packages to create these adapters and transformers and run a live pipeline against the Stellar network. From d234ec30e3812c8538196707c9ee79b92f96e384 Mon Sep 17 00:00:00 2001 From: Bri <92327786+briwylde08@users.noreply.github.com> Date: Mon, 24 Jun 2024 11:54:05 -0600 Subject: [PATCH 7/7] Ingestion Pipeline Sample Code editorial nits --- docs/building-apps/ingest-sdk/ingestion-pipeline-code.mdx | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/building-apps/ingest-sdk/ingestion-pipeline-code.mdx b/docs/building-apps/ingest-sdk/ingestion-pipeline-code.mdx index 197592108..39cc46ca0 100644 --- a/docs/building-apps/ingest-sdk/ingestion-pipeline-code.mdx +++ b/docs/building-apps/ingest-sdk/ingestion-pipeline-code.mdx @@ -3,9 +3,9 @@ title: Ingestion Pipeline Sample Code sidebar_position: 30 --- -Complete code for a small sample of a Stellar network ingestion pipeline using the Stellar Go [Ingestion Sdk](overview.mdx#the-ingestion-sdk-packages) to publish derived data to a remote message broker. Demonstrate event driven, distributed processing with a sample microservice(python script) as subscriber. +Complete code for a small sample of a Stellar network ingestion pipeline using the Stellar Go [Ingestion SDK](overview.mdx#the-ingestion-sdk-packages) to publish derived data to a remote message broker. Demonstrate event-driven, distributed processing with a sample microservice (Python script) as subscriber. -This example uses ZeroMQ [goczmq](https://github.com/zeromq/goczmq) go wrapper SDK, which requires a few o/s [dependent libraries be installed on host machine](https://github.com/zeromq/goczmq?tab=readme-ov-file#dependencies) also. +This example uses the ZeroMQ [goczmq](https://github.com/zeromq/goczmq) Go wrapper SDK, which requires a few o/s [dependent libraries to also be installed on the host machine](https://github.com/zeromq/goczmq?tab=readme-ov-file#dependencies). Put these files in a directory, compile and run with `go build -o pipeline ./.; ./pipeline` @@ -65,7 +65,7 @@ type AppPayment struct { Amount string } -// General stream topolgy +// General stream topology type Message struct { Payload []byte } @@ -261,7 +261,7 @@ func main() { ### `distributed_payment_subsciber.py` -A python script demonstrating how we now have distributed processing and event driven architecture by leveraging the MQ Broker to push derived application payment data model out to other microservices. make sure to `pip install pyzmq` +A Python script demonstrating how we now have distributed processing and event driven architecture by leveraging the MQ Broker to push derived application payment data model out to other microservices. Make sure to `pip install pyzmq`