ETL
The Pessimism ETL is a generalized abstraction for a DAG-based process system that continuously transforms chain data into inputs for consumption by a Risk Engine in the form of intertwined data “paths”. This DAG based representation of ETL operations is done to ensure that the application can optimally scale to support many active heuristics. This design allows for the reuse of modularized ETL processes and de-duplication of conflicting paths under certain key logical circumstances.
Process
A process refers to a graph node within the ETL system. Every process performs some operation for transforming data from any data source into a consumable input for the Risk Engine to ingest. Currently, there are three total process types:
Subscriber
- Used to perform local arbitrary computations (e.g. Extracting L1Withdrawal transactions from a block)Reader
- Used to poll and collect data from some third-party source (e.g. Querying real-time account balance amounts from an op-geth execution client)
Inter-Connectivity
The diagram below showcases how interactivity between processes occurs:
Egress Handler
All process types use an egressHandler
struct for routing transit data to actively subscribed downstream ETL processes.
Ingress Handler
All process types also use an ingressHandler
struct for ingesting active transit data from upstream ETL processes.
Process ID
All processes have an ID that stores critical identification data. Process IDs are used by higher order abstractions to:
- Represent a process DAG
- Understand when duplicate processes are generated
Process ID’s constitute of both a randomly generated UUID
and a deterministic PID
. This is done to ensure uniqueness of each process instance while also ensuring collision based properties so that processes can be reused when viable.
A ProcIdentifier
is encoded using the following four byte sequence:
0 1 2 3 4
|--------|--------|--------|--------|
network path process register
id type type type
State Update Handling
NOTE - State handling policies by management abstractions has yet to be properly fleshed out
Subscriber
Subscribers are used to perform arbitrary transformations on some provided upstream input data. Once input data processing has been completed, the output data is then submitted to its respective destination(s).
Attributes
- An
ActivityState
channel with a path manager - Ingress handler that other processes can write to
TransformFunc
- A processing function that performs some data translation/transformation on respective inputs- An
egressHandler
that stores dependencies to write to (i.e. Other path processes, heuristic engine) - A specified output data type
Example Use Case(s)
- Generating opcode traces for some EVM transaction
- Parsing emitted events from a transaction
Reader
Oracles are responsible for collecting data from some external third party (e.g. L1 geth node, L2 rollup node, etc.). As of now, reader’s are configurable through the use of a standard OracleDefinition
interface that allows developers to write arbitrary reader logic.
The following key interface functions are supported/enforced:
ReadRoutine
- Routine used for reading/polling real-time data for some arbitrarily configured data source
Unlike other processes, Oracles
actually employ 2 go routines to safely operate. This is because the definition routines are run as a separate go routine with a communication channel to the actual Reader
event loop. This is visualized below:
Attributes
- A communication channel with the path manager
- Poller/subscription logic that performs real-time data reads on some third-party source
- An
egressHandler
that stores dependencies to write to (i.e. Other path processes, heuristic engine) -
A specified output data type
- (Optional) Interface with some storage (postgres, mongo, etc.) to persist lively extracted data
- (Optional) Backtest support for polling some data between some starting and ending block heights
- (Optional) Use of an application state cache to understand which parameter sets to sequentially feed to an endpoint
Example Use Case(s)
- Polling layer-2 block data in real-time for state updates
- Interval polling user provided chain addresses for native ETH amounts
(TBD) Aggregator
NOTE - This process type is still in-development Aggregators are used to solve the problem where a Subscriber or a heuristic input will require multiple sources of data to perform an execution sequence. Since aggregators are subscribing to more than one data stream with different output frequencies, they must employ a synchronization policy for collecting and propagating multi-data inputs within a highly asynchronous environment.
Attributes
- Able to read heterogenous transit data from an arbitrary number of process ingresses
- A synchronization policy that defines how different transit data from multiple ingress streams will be aggregated into a collectively bound single piece of data
- EgressHandler to handle downstream transit data routing to other processes or destinations
Single Value Subscription
Only send output at the update of a single ingress stream
Single Value Subscription refers to a synchronization policy where a bucketed multi-data tuple is submitted every time there’s an update to a single input data queue.
For example we can have a heuristic that subscribes to blocks from two heterogenous chains (layer1, layer2) or {ChainA, ChainB}
, let’s assume BLOCK_TIME(ChainA) > BLOCK_TIME(ChainB)
.
We can either specify that the heuristic will run every time there’s an update or a new block from ChainA
:
{
"A:latest_blocks": [xi] where cardinality = 1,
"B:latest_blocks": [yj, ..., yn] where cardinality >= 1,
}
Or we can specify the inverse, every-time there’s a new block from ChainB
:
{
"A:latest_blocks": [NULL OR xi] where cardinality <= 1,
"B:latest_blocks": [yj] where cardinality = 1,
}
This should be extendable to any number of heterogenous data sources.
Registry
A registry submodule is used to store all ETL data register definitions that provide the blueprint for a unique ETL process type. A register definition consists of:
DataType
- The output data type of the process node. This is used for data serialization/deserialization by both the ETL and Risk Engine subsystems.ProcessType
- The type of process being invoked (e.g. Oracle).ProcessConstructor
- Constructor function used to create unique process instances. All processes must implement theProcess
interface.Dependencies
- Ordered slice of data register dependencies that are necessary for the process to operate. For example, a process that requires a geth block would have a dependency list of[geth.block]
. This dependency list is used to ensure that the ETL can properly construct a process graph that satisfies all process dependencies.
Addressing
Some process’s require knowledge of a specific address to properly function. For example, an reader that polls a geth node for native ETH balance amounts would need knowledge of the address to poll. To support this, the ETL leverages a shared state store between the ETL and Risk Engine subsystems.
Shown below is how the ETL and Risk Engine interact with the shared state store using a BalanceOracle
process as an example:
Geth Block Reader Register
A BlockHeader
register refers to a block output extracted from a go-ethereum node. This register is used for creating Reader
processes that poll and extract block data from a go-ethereum node in real-time.
Geth Account Balance Reader Register
An AccountBalance
register refers to a native ETH balance output extracted from a go-ethereum node. This register is used for creating Reader
processes that poll and extract native ETH balance data for some state persisted addresses from a go-ethereum node in real-time.
Unlike, the BlockHeader
register, this register requires knowledge of an address set that’s shared with the risk engine to properly function and is therefore addressable. Because of this, any heuristic that uses this register must also be addressable.
Managed ETL
Process Graph
The ETL uses a ProcessGraph
construct to represent and store critical process inter-connectivity data (ie. process node entries and graph edges).
A graph edge is represented as a binded communication path between two arbitrary process nodes (c1
, c2
). Adding an edge from some process (c1
) to some downstream process (c2
) results in c1
having a path to the ingress of c2
in its egress handler. This would look something like:
NOTE: The process graph used in the ETL is represented as a DAG (Directed Acyclic Graph), meaning that no bipartite edge relationships should exist between two processes (c1
, c2
) where c1-->c2
&& c2-->c1
. While there are no explicit checks for this in the code software, it should be impossible given that all processes declare entrypoint register dependencies within their metadata, meaning that a process could only be susceptible to bipartite connectivity in the circumstance where a process registry definition declares inversal input->output of an existing process
Path
Paths are used to represent some full process path in a DAG based ProcessGraph
. A path is a sequence of processes that are connected together in a way to express meaningful ETL operations for extracting some heuristic input for consumption by the Risk Engine.
Path States
Backfill
- Backfill denotes that the path is currently performing a backfill operation. This means the path is sequentially reading data from some starting height to the most recent block height. This is useful for building state dependent paths that require some knowledge of prior history to make live assessments. For example, detecting imbalances between the native ETH deposit supply on the L1 portal contract and the TVL unlocked on the L2 chain would require indexing the prior history of L1 deposits to construct correct supply values.Live
- Live denotes that the path is currently performing live operations. This means the path is reading data from the most recent block height.Stopped
- Stopped denotes that the path is currently not performing any operations. This means the path is neither reading nor processing any data.Paused
- Paused denotes that the path is currently not performing any operations. This means the path is neither reading nor processing any data. The difference betweenStopped
andPaused
is that aPaused
path can be resumed at any time while aStopped
path must be restarted.Error
- Error denotes that the path is currently in an error state. This means the path is neither reading nor processing any data. The difference betweenStopped
andError
is that anError
path can be resumed at any time while aStopped
path must be restarted.
Path Types
There are two types of paths:
Live A live path is a path that is actively running and performing ETL operations on some data fetched in real-time. For example, a live path could be used to extract newly curated block data from a go-ethereum node.
Backtest
A backtest path is a path that is used to sequentially backtest some process sequence from some starting to ending block height. For example, a backtest path could be used to backtest a balance_enforcement heuristic between L1 block heights 0
to 1000
.
Path UUID (PathID)
All paths have a PathID that stores critical identification data. Path UUIDs are used by higher order abstractions to:
- Route heuristic inputs between the ETL and Risk Engine
- Understand when path collisions between
PIDs
occur
Path UUID’s constitute of both a randomly generated UUID
and a deterministic PID
. This is done to ensure uniqueness of each process instance while also ensuring collision based properties so that overlapping processes can be deduplicated when viable.
A PathPID
is encoded using the following 9 byte array sequence:
0 1 5 9
|--------|----------------------------------------|----------------------------------------|
Path first path path last path path
type process PID sequence process PID sequence
Collision Analysis
NOTE - This section is still in-development
Path collisions occur when two paths with the same PID
are generated. This can occur when two paths have identical process sequences and valid stateful properties.
For some path collision to occur between two paths (P0
, P1
), the following properties must hold true:
P0
must have the samePID
asP1
P0
andP1
must be live paths that aren’t performing backtests or backfilling operations
Once a collision is detected, the ETL will attempt to deduplicate the path by:
- Stopping the event loop of
P1
- Removing the
PID
ofP1
from the path manager - Merging shared state from
P1
toP0
ETL Manager
EtlManager
is used for connecting lower-level objects (Process Graph, Path) together in a way to express meaningful ETL administration logic; ie:
- Creating a new path
- Removing a path
- Merging some paths
- Updating a path