The Pessimism ETL is a generalized abstraction for a DAG-based process system that continuously transforms chain data into inputs for consumption by a Risk Engine in the form of intertwined data “paths”. This DAG based representation of ETL operations is done to ensure that the application can optimally scale to support many active heuristics. This design allows for the reuse of modularized ETL processes and de-duplication of conflicting paths under certain key logical circumstances.

Process

A process refers to a graph node within the ETL system. Every process performs some operation for transforming data from any data source into a consumable input for the Risk Engine to ingest. Currently, there are three total process types:

  1. Subscriber - Used to perform local arbitrary computations (e.g. Extracting L1Withdrawal transactions from a block)
  2. Reader - Used to poll and collect data from some third-party source (e.g. Querying real-time account balance amounts from an op-geth execution client)

Inter-Connectivity

The diagram below showcases how interactivity between processes occurs:

graph LR; A((Process0)) -->|dataX| C[Ingress]; subgraph B["Process1"] C --> D[ingressHandler]; D --> |dataX| E(eventLoop); E --> |dataY| F[egressHandler]; F --> |dataY| G[egress0]; F --> |dataY| H[egress1]; end G --> K((Process2)); H --> J((Process3));

Egress Handler

All process types use an egressHandler struct for routing transit data to actively subscribed downstream ETL processes.

flowchart TD; Process-->|Transit_Data|A[Egress0]; Process-->|Transit_Data|B[Egress1];

Ingress Handler

All process types also use an ingressHandler struct for ingesting active transit data from upstream ETL processes.

Process ID

All processes have an ID that stores critical identification data. Process IDs are used by higher order abstractions to:

  • Represent a process DAG
  • Understand when duplicate processes are generated

Process ID’s constitute of both a randomly generated UUID and a deterministic PID. This is done to ensure uniqueness of each process instance while also ensuring collision based properties so that processes can be reused when viable.

A ProcIdentifier is encoded using the following four byte sequence:

            0        1        2        3        4
            |--------|--------|--------|--------|
             network  path process register
             id         type     type      type

State Update Handling

NOTE - State handling policies by management abstractions has yet to be properly fleshed out

Subscriber

Subscribers are used to perform arbitrary transformations on some provided upstream input data. Once input data processing has been completed, the output data is then submitted to its respective destination(s).

Attributes

  • An ActivityState channel with a path manager
  • Ingress handler that other processes can write to
  • TransformFunc - A processing function that performs some data translation/transformation on respective inputs
  • An egressHandler that stores dependencies to write to (i.e. Other path processes, heuristic engine)
  • A specified output data type

Example Use Case(s)

  • Generating opcode traces for some EVM transaction
  • Parsing emitted events from a transaction

Reader

Oracles are responsible for collecting data from some external third party (e.g. L1 geth node, L2 rollup node, etc.). As of now, reader’s are configurable through the use of a standard OracleDefinition interface that allows developers to write arbitrary reader logic. The following key interface functions are supported/enforced:

  • ReadRoutine - Routine used for reading/polling real-time data for some arbitrarily configured data source

Unlike other processes, Oracles actually employ 2 go routines to safely operate. This is because the definition routines are run as a separate go routine with a communication channel to the actual Reader event loop. This is visualized below:

graph LR; subgraph A[Reader] B[eventLoop]-->|channel|ODefRoutine; B[eventLoop]-->|context|ODefRoutine; B-->B; end

Attributes

  • A communication channel with the path manager
  • Poller/subscription logic that performs real-time data reads on some third-party source
  • An egressHandler that stores dependencies to write to (i.e. Other path processes, heuristic engine)
  • A specified output data type

  • (Optional) Interface with some storage (postgres, mongo, etc.) to persist lively extracted data
  • (Optional) Backtest support for polling some data between some starting and ending block heights
  • (Optional) Use of an application state cache to understand which parameter sets to sequentially feed to an endpoint

Example Use Case(s)

  • Polling layer-2 block data in real-time for state updates
  • Interval polling user provided chain addresses for native ETH amounts

(TBD) Aggregator

NOTE - This process type is still in-development Aggregators are used to solve the problem where a Subscriber or a heuristic input will require multiple sources of data to perform an execution sequence. Since aggregators are subscribing to more than one data stream with different output frequencies, they must employ a synchronization policy for collecting and propagating multi-data inputs within a highly asynchronous environment.

Attributes

  • Able to read heterogenous transit data from an arbitrary number of process ingresses
  • A synchronization policy that defines how different transit data from multiple ingress streams will be aggregated into a collectively bound single piece of data
  • EgressHandler to handle downstream transit data routing to other processes or destinations

Single Value Subscription

Only send output at the update of a single ingress stream

Single Value Subscription refers to a synchronization policy where a bucketed multi-data tuple is submitted every time there’s an update to a single input data queue.

For example we can have a heuristic that subscribes to blocks from two heterogenous chains (layer1, layer2) or {ChainA, ChainB}, let’s assume BLOCK_TIME(ChainA) > BLOCK_TIME(ChainB).

We can either specify that the heuristic will run every time there’s an update or a new block from ChainA:

{
   "A:latest_blocks": [xi] where cardinality = 1,
   "B:latest_blocks": [yj, ..., yn] where cardinality >= 1,
}

Or we can specify the inverse, every-time there’s a new block from ChainB:

{
   "A:latest_blocks": [NULL OR xi] where cardinality <= 1,
   "B:latest_blocks": [yj] where cardinality = 1,
}

This should be extendable to any number of heterogenous data sources.

Registry

A registry submodule is used to store all ETL data register definitions that provide the blueprint for a unique ETL process type. A register definition consists of:

  • DataType - The output data type of the process node. This is used for data serialization/deserialization by both the ETL and Risk Engine subsystems.
  • ProcessType - The type of process being invoked (e.g. Oracle).
  • ProcessConstructor - Constructor function used to create unique process instances. All processes must implement the Process interface.
  • Dependencies - Ordered slice of data register dependencies that are necessary for the process to operate. For example, a process that requires a geth block would have a dependency list of [geth.block]. This dependency list is used to ensure that the ETL can properly construct a process graph that satisfies all process dependencies.

Addressing

Some process’s require knowledge of a specific address to properly function. For example, an reader that polls a geth node for native ETH balance amounts would need knowledge of the address to poll. To support this, the ETL leverages a shared state store between the ETL and Risk Engine subsystems.

Shown below is how the ETL and Risk Engine interact with the shared state store using a BalanceOracle process as an example:

graph LR; subgraph SB["State Store"] state end subgraph EM["Engine Subsystem"] SessionHander --> |"Set(PathID, address)"|state SessionHander --> |"Delete(PathID, address)"|state end subgraph ETL["ETL Subsystem"] BO --> |"{3} []address"|GETH[("go-ethereum node")] GETH --> |"{4} []balance"|BO BO("Balance Reader") --> |"{1} Get(PathID)"|state BO -."eventLoop()".-> BO state --> |"{2} []address"|BO end

Geth Block Reader Register

A BlockHeader register refers to a block output extracted from a go-ethereum node. This register is used for creating Reader processes that poll and extract block data from a go-ethereum node in real-time.

Geth Account Balance Reader Register

An AccountBalance register refers to a native ETH balance output extracted from a go-ethereum node. This register is used for creating Reader processes that poll and extract native ETH balance data for some state persisted addresses from a go-ethereum node in real-time. Unlike, the BlockHeader register, this register requires knowledge of an address set that’s shared with the risk engine to properly function and is therefore addressable. Because of this, any heuristic that uses this register must also be addressable.

Managed ETL

Process Graph

The ETL uses a ProcessGraph construct to represent and store critical process inter-connectivity data (ie. process node entries and graph edges).

A graph edge is represented as a binded communication path between two arbitrary process nodes (c1, c2). Adding an edge from some process (c1) to some downstream process (c2) results in c1 having a path to the ingress of c2 in its egress handler. This would look something like:

graph TB; subgraph "\nEdge" subgraph A[process0] B[egressHandler]; end subgraph D[process1] B -.-> |egress| C(ingressHandler) end end classDef orange fill:#f96,stroke:#333,stroke-width:4px class A,D orange

NOTE: The process graph used in the ETL is represented as a DAG (Directed Acyclic Graph), meaning that no bipartite edge relationships should exist between two processes (c1, c2) where c1-->c2 && c2-->c1. While there are no explicit checks for this in the code software, it should be impossible given that all processes declare entrypoint register dependencies within their metadata, meaning that a process could only be susceptible to bipartite connectivity in the circumstance where a process registry definition declares inversal input->output of an existing process

Path

Paths are used to represent some full process path in a DAG based ProcessGraph. A path is a sequence of processes that are connected together in a way to express meaningful ETL operations for extracting some heuristic input for consumption by the Risk Engine.

Path States

  • Backfill - Backfill denotes that the path is currently performing a backfill operation. This means the path is sequentially reading data from some starting height to the most recent block height. This is useful for building state dependent paths that require some knowledge of prior history to make live assessments. For example, detecting imbalances between the native ETH deposit supply on the L1 portal contract and the TVL unlocked on the L2 chain would require indexing the prior history of L1 deposits to construct correct supply values.
  • Live - Live denotes that the path is currently performing live operations. This means the path is reading data from the most recent block height.
  • Stopped - Stopped denotes that the path is currently not performing any operations. This means the path is neither reading nor processing any data.
  • Paused - Paused denotes that the path is currently not performing any operations. This means the path is neither reading nor processing any data. The difference between Stopped and Paused is that a Paused path can be resumed at any time while a Stopped path must be restarted.
  • Error - Error denotes that the path is currently in an error state. This means the path is neither reading nor processing any data. The difference between Stopped and Error is that an Error path can be resumed at any time while a Stopped path must be restarted.

Path Types

There are two types of paths:

Live A live path is a path that is actively running and performing ETL operations on some data fetched in real-time. For example, a live path could be used to extract newly curated block data from a go-ethereum node.

Backtest A backtest path is a path that is used to sequentially backtest some process sequence from some starting to ending block height. For example, a backtest path could be used to backtest a balance_enforcement heuristic between L1 block heights 0 to 1000.

Path UUID (PathID)

All paths have a PathID that stores critical identification data. Path UUIDs are used by higher order abstractions to:

  • Route heuristic inputs between the ETL and Risk Engine
  • Understand when path collisions between PIDs occur

Path UUID’s constitute of both a randomly generated UUID and a deterministic PID. This is done to ensure uniqueness of each process instance while also ensuring collision based properties so that overlapping processes can be deduplicated when viable.

A PathPID is encoded using the following 9 byte array sequence:

            0        1                                        5                                        9
            |--------|----------------------------------------|----------------------------------------|
             Path             first path path               last path path
             type                 process PID sequence            process PID sequence

Collision Analysis

NOTE - This section is still in-development Path collisions occur when two paths with the same PID are generated. This can occur when two paths have identical process sequences and valid stateful properties.

For some path collision to occur between two paths (P0, P1), the following properties must hold true:

  1. P0 must have the same PID as P1
  2. P0 and P1 must be live paths that aren’t performing backtests or backfilling operations

Once a collision is detected, the ETL will attempt to deduplicate the path by:

  1. Stopping the event loop of P1
  2. Removing the PID of P1 from the path manager
  3. Merging shared state from P1 to P0

ETL Manager

EtlManager is used for connecting lower-level objects (Process Graph, Path) together in a way to express meaningful ETL administration logic; ie:

  • Creating a new path
  • Removing a path
  • Merging some paths
  • Updating a path