Monitor data updates and build efficient incremental pipelines in Snowflake
Flipside data shares in Snowflake are continuously updated as blockchain data is produced. This page
shows you how to monitor freshness and build incremental data pipelines using Snowflake SQL.
Looking for latency targets? See the general Data Freshness
guide for detailed latency expectations by schema type and
blockchain.
Use this SQL query to check how current your data is:
Copy
Ask AI
-- Check latest block timestamp for a chainSELECT MAX(block_number) AS latest_block, MAX(block_timestamp) AS latest_timestamp, DATEDIFF('minute', MAX(block_timestamp), CURRENT_TIMESTAMP()) AS minutes_behindFROM ethereum.core.fact_blocks;
Interpretation:
Less than 1 hour behind: Normal for Core tables
1-6 hours behind: Normal for DeFi/NFT tables
Greater than 24 hours behind: Potential issue, contact support
Monitor data freshness across multiple table types:
Copy
Ask AI
-- Check freshness of different schemasSELECT 'Core: fact_blocks' AS table_name, MAX(block_timestamp) AS latest_data, DATEDIFF('minute', MAX(block_timestamp), CURRENT_TIMESTAMP()) AS minutes_behindFROM ethereum.core.fact_blocksUNION ALLSELECT 'Core: fact_transactions' AS table_name, MAX(block_timestamp) AS latest_data, DATEDIFF('minute', MAX(block_timestamp), CURRENT_TIMESTAMP()) AS minutes_behindFROM ethereum.core.fact_transactionsUNION ALLSELECT 'DeFi: ez_dex_swaps' AS table_name, MAX(block_timestamp) AS latest_data, DATEDIFF('minute', MAX(block_timestamp), CURRENT_TIMESTAMP()) AS minutes_behindFROM ethereum.defi.ez_dex_swapsUNION ALLSELECT 'NFT: ez_nft_sales' AS table_name, MAX(block_timestamp) AS latest_data, DATEDIFF('minute', MAX(block_timestamp), CURRENT_TIMESTAMP()) AS minutes_behindFROM ethereum.nft.ez_nft_salesORDER BY minutes_behind DESC;
-- Compare freshness across your mounted sharesSELECT 'Ethereum' AS chain, MAX(block_timestamp) AS latest_data, DATEDIFF('minute', MAX(block_timestamp), CURRENT_TIMESTAMP()) AS minutes_behindFROM ETHEREUM_CORE.ethereum.core.fact_blocksUNION ALLSELECT 'Arbitrum' AS chain, MAX(block_timestamp) AS latest_data, DATEDIFF('minute', MAX(block_timestamp), CURRENT_TIMESTAMP()) AS minutes_behindFROM ARBITRUM_CORE.arbitrum.core.fact_blocksUNION ALLSELECT 'Solana' AS chain, MAX(block_timestamp) AS latest_data, DATEDIFF('minute', MAX(block_timestamp), CURRENT_TIMESTAMP()) AS minutes_behindFROM SOLANA_CORE.solana.core.fact_blocksORDER BY minutes_behind;
The timestamp when the row was first inserted into Flipside’s database.Clustered: ❌ NoUse for:
Identifying newly added data
Incremental data loads
Monitoring ingestion progress
Copy
Ask AI
-- Find data inserted in last hourSELECT *FROM ethereum.core.fact_transactionsWHERE _inserted_timestamp >= DATEADD(hour, -1, CURRENT_TIMESTAMP()) AND block_timestamp >= CURRENT_DATE - 7; -- Still include block_timestamp for performance
Performance tip: Always include a block_timestamp filter even when filtering on
_inserted_timestamp to leverage clustering.
-- Identify missing blocks in sequenceWITH block_sequence AS ( SELECT block_number, LAG(block_number) OVER (ORDER BY block_number) AS prev_block FROM ethereum.core.fact_blocks WHERE block_timestamp >= CURRENT_DATE - 1)SELECT prev_block AS last_block_before_gap, block_number AS first_block_after_gap, block_number - prev_block AS gap_sizeFROM block_sequenceWHERE block_number - prev_block > 1ORDER BY gap_size DESC;
-- Verify transaction counts match between fact_blocks and fact_transactionsWITH block_counts AS ( SELECT block_number, tx_count AS reported_tx_count FROM ethereum.core.fact_blocks WHERE block_timestamp >= CURRENT_DATE - 1),actual_counts AS ( SELECT block_number, COUNT(*) AS actual_tx_count FROM ethereum.core.fact_transactions WHERE block_timestamp >= CURRENT_DATE - 1 GROUP BY block_number)SELECT b.block_number, b.reported_tx_count, COALESCE(a.actual_tx_count, 0) AS actual_tx_count, b.reported_tx_count - COALESCE(a.actual_tx_count, 0) AS differenceFROM block_counts bLEFT JOIN actual_counts a ON b.block_number = a.block_numberWHERE b.reported_tx_count != COALESCE(a.actual_tx_count, 0)ORDER BY ABS(b.reported_tx_count - COALESCE(a.actual_tx_count, 0)) DESCLIMIT 20;
Set up Snowflake tasks to monitor freshness automatically:
Copy
Ask AI
-- Create a task to check freshness hourlyCREATE OR REPLACE TASK monitor_ethereum_freshness WAREHOUSE = compute_wh SCHEDULE = 'USING CRON 0 * * * * America/Los_Angeles' -- Every hourASINSERT INTO freshness_alertsSELECT CURRENT_TIMESTAMP() AS check_time, 'ethereum.core.fact_blocks' AS table_name, MAX(block_timestamp) AS latest_block_time, DATEDIFF('minute', MAX(block_timestamp), CURRENT_TIMESTAMP()) AS minutes_behind, CASE WHEN DATEDIFF('minute', MAX(block_timestamp), CURRENT_TIMESTAMP()) > 120 THEN 'ALERT: Data is stale' ELSE 'OK' END AS statusFROM ethereum.core.fact_blocks;-- Resume the task to start monitoringALTER TASK monitor_ethereum_freshness RESUME;