Workers & Indexing

Workers are background processes that continuously fetch, transform, and store DeFi data. They're the engine that keeps the platform's data fresh.

Architecture

The backend uses a multi-tier worker pattern coordinated by two main components:

  • DefiWorkerPool — manages all per-protocol pool services
  • PositionCoordinator — orchestrates the three position workers (LogListener, LogProcessor, GapFiller)
DefiWorkerPool
    |
    |--- AaveService (every 30 min)
    |--- YearnService (every 30 min)
    |--- UniswapV3Service (every 2 min)
    |--- UniswapV4Service (every 2 min)
    |--- MorphoService (every 30 min)
    |--- AerodromeService (every 30 min)
    |--- AerodromeSlipstreamService (every 2 min)

PositionCoordinator
    |
    |--- LogListener (real-time WebSocket events)
    |--- LogProcessor (parses and stores detected positions)
    |--- GapFiller (backfills missed blocks + valuation recalcs)

Pool Workers

Per-protocol services that fetch metadata about DeFi pools (liquidity pools, lending markets, vaults). This data powers the "Earn" discovery page where users browse available yield opportunities.

What they collect:

  • Pool addresses and token pairs
  • Total Value Locked (TVL)
  • Current APY/APR
  • Trading volume
  • Fee tiers

Schedule: Most protocols update every 30 minutes. Uniswap V3/V4 and Aerodrome Slipstream update every 2 minutes because concentrated liquidity data changes more frequently.

Position Workers

Position tracking is split into three coordinated components:

LogListener

Subscribes to blockchain events via WebSocket (Alchemy WSS) across all supported chains. Listens for relevant contract events like Supply, Mint, Burn, and Withdraw, then streams them to the LogProcessor.

LogProcessor

Receives event batches from LogListener and GapFiller. Parses blockchain events, matches them against tracked pools and wallets, and detects position flows (opens, closes, deposits, withdrawals). Writes results to the nmt_position and nmt_position_flow tables.

GapFiller

Periodically fills gaps in position data by fetching logs from the blockchain for blocks that may have been missed. Also runs backfill operations to recalculate position valuations at fixed intervals (every 24 hours).

Price Fetching

Token prices are fetched on demand via a PriceFetcher utility, not as a background worker. When other workers (pool services, valuation backfills) need current prices, they call the PriceFetcher which queries CoinGecko Pro API.

Protocol Service Pattern

Every protocol follows the same service pattern, defined by the DefiService trait:

trait DefiService {
    // Fetch all pools for this protocol
    async fn index_pools(&self) -> Result<Vec<NmtPool>>;

    // Detect positions for a specific user
    async fn index_positions(&self, user: &Address) -> Result<Vec<NmtPosition>>;

    // Calculate the value of a position
    async fn valuate(&self, position: &NmtPosition) -> Result<Decimal>;
}

Each protocol implements this trait:

Service Location Pools Source
Uniswap V3/V4 workers/src/defi/services/uniswap_v3/ Krystal API
Uniswap V2 workers/src/defi/services/uniswap_v2/ Krystal API
Aave V3 workers/src/defi/services/aave/ Aave GraphQL
Morpho workers/src/defi/services/morpho/ The Graph + headless Chrome
Yearn workers/src/defi/services/yearn/ yDaemon API
Aerodrome workers/src/defi/services/aerodrome/ On-chain (curated list)
Aerodrome Slipstream workers/src/defi/services/aerodrome_slipstream/ On-chain

TVL Filters

Not every pool on every protocol is worth tracking. The backend applies minimum TVL (Total Value Locked) filters to avoid indexing dust pools:

Protocol Min TVL Rationale
Aave V3 $0 All markets relevant
Yearn $0 Curated vaults
Uniswap V2 $100,000 Filter inactive pairs
Uniswap V3/V4 $500,000 Focus on liquid pools
Morpho $300,000 MetaMorpho vaults only
Aerodrome Slipstream $500,000 CL pools only
Aerodrome (basic) $0 Curated list of 15 pools

APY Safety Caps

To filter out erroneous data, the backend caps reported APY values:

Protocol Max APY
Most protocols 300%
Aerodrome Slipstream 2,000%

Any APY above these limits is discarded as likely incorrect.

Concurrency Settings

Setting Default Purpose
Ingestion concurrency 30 Parallel data fetch workers
Confirmation concurrency 5 Parallel blockchain confirmations

results matching ""

    No results matching ""