Workers & Indexing
Workers are background processes that continuously fetch, transform, and store DeFi data. They're the engine that keeps the platform's data fresh.
Architecture
The backend uses a multi-tier worker pattern coordinated by two main components:
- DefiWorkerPool — manages all per-protocol pool services
- PositionCoordinator — orchestrates the three position workers (LogListener, LogProcessor, GapFiller)
DefiWorkerPool
|
|--- AaveService (every 30 min)
|--- YearnService (every 30 min)
|--- UniswapV3Service (every 2 min)
|--- UniswapV4Service (every 2 min)
|--- MorphoService (every 30 min)
|--- AerodromeService (every 30 min)
|--- AerodromeSlipstreamService (every 2 min)
PositionCoordinator
|
|--- LogListener (real-time WebSocket events)
|--- LogProcessor (parses and stores detected positions)
|--- GapFiller (backfills missed blocks + valuation recalcs)
Pool Workers
Per-protocol services that fetch metadata about DeFi pools (liquidity pools, lending markets, vaults). This data powers the "Earn" discovery page where users browse available DeFi pools and vaults. Displayed rates are variable and not indicative of future performance.
What they collect:
- Pool addresses and token pairs
- Total Value Locked (TVL)
- Current APY/APR
- Trading volume
- Fee tiers
Schedule: Most protocols update every 30 minutes. Uniswap V3/V4 and Aerodrome Slipstream update every 2 minutes because concentrated liquidity data changes more frequently.
Position Workers
Position tracking is split into three coordinated components:
LogListener
Subscribes to blockchain events via WebSocket (Alchemy WSS) across all supported chains. Listens for relevant contract events like Supply, Mint, Burn, and Withdraw, then streams them to the LogProcessor.
LogProcessor
Receives event batches from LogListener and GapFiller. Parses blockchain events, matches them against tracked pools and wallets, and detects position flows (opens, closes, deposits, withdrawals). Writes results to the nmt_position and nmt_position_flow tables.
GapFiller
Periodically fills gaps in position data by fetching logs from the blockchain for blocks that may have been missed. Also runs backfill operations to recalculate position valuations at fixed intervals (every 24 hours).
Valuation Model
Position valuations combine two data points:
- Current USD value — computed per-protocol (on-chain reads + price fetches)
- Deposit and reward amounts — derived from position flows
The flow-based valuation model tracks every deposit and withdrawal as an nmt_position_flow record with a direction (Deposit/Withdraw) and USD amount. At valuation time, the system replays all flows to compute:
- deposit_usd — total USD deposited
- reward_usd — net gain:
current_value + withdrawn - deposited
This replaced the earlier approach where deposits and rewards were estimated separately per protocol. The unified model lives in workers/src/defi/services/common/valuations.rs and workers/src/defi/positions/flows.rs.
Related changes:
- Valuation metadata was unified across all protocols into a shared struct
- Pools support soft delete — hidden/unhidden rather than permanently removed, so historical valuations remain intact
- Unnecessary rounding operations were removed to preserve precision
Price Fetching
Token prices are fetched on demand via a PriceFetcher utility, not as a background worker. When other workers (pool services, valuation backfills) need current prices, they call the PriceFetcher which queries CoinGecko Pro API. A fallback price fetcher was added for cases where CoinGecko data is unavailable.
Protocol Service Pattern
Every protocol follows the same service pattern, defined by the DefiService trait:
trait DefiService {
// Fetch all pools for this protocol
async fn index_pools(&self) -> Result<Vec<NmtPool>>;
// Detect positions for a specific user
async fn index_positions(&self, user: &Address) -> Result<Vec<NmtPosition>>;
// Calculate the value of a position
async fn valuate(&self, position: &NmtPosition) -> Result<Decimal>;
}
Each protocol implements this trait:
| Service | Location | Pools Source |
|---|---|---|
| Uniswap V3 | workers/src/defi/services/uniswap_v3/ |
Krystal API |
| Uniswap V4 | workers/src/defi/services/uniswap_v4/ |
Krystal API |
| Uniswap V2 | workers/src/defi/services/uniswap_v2/ |
Krystal API |
| Aave V3 | workers/src/defi/services/aave/ |
Aave GraphQL |
| Morpho | workers/src/defi/services/morpho/ |
The Graph + headless Chrome |
| Yearn | workers/src/defi/services/yearn/ |
yDaemon API |
| Aerodrome | workers/src/defi/services/aerodrome/ |
On-chain (curated list) |
| Aerodrome Slipstream | workers/src/defi/services/aerodrome_slipstream/ |
On-chain |
TVL Filters
Not every pool on every protocol is worth tracking. The backend applies minimum TVL (Total Value Locked) filters to avoid indexing dust pools:
| Protocol | Min TVL | Rationale |
|---|---|---|
| Aave V3 | $0 | All markets relevant |
| Yearn | $0 | Curated vaults |
| Uniswap V2 | $100,000 | Filter inactive pairs |
| Uniswap V3/V4 | $500,000 | Focus on liquid pools |
| Morpho | $300,000 | MetaMorpho vaults only |
| Aerodrome Slipstream | $500,000 | CL pools only |
| Aerodrome (basic) | $0 | Curated list of 15 pools |
APY Safety Caps
To filter out erroneous data, the backend caps reported APY values:
| Protocol | Max APY |
|---|---|
| Most protocols | 300% |
| Aerodrome Slipstream | 2,000% |
Any APY above these limits is discarded as likely incorrect.
Concurrency Settings
| Setting | Default | Purpose |
|---|---|---|
| Ingestion concurrency | 30 | Parallel data fetch workers |
| Confirmation concurrency | 5 | Parallel blockchain confirmations |