Skip to main content

Materialized Views

Materialized views in Timeplus Proton are continuous queries that transform and route streaming data in real-time. Unlike traditional databases where materialized views are periodic snapshots, Proton’s materialized views process data incrementally as it arrives.

Overview

Key characteristics:
  • Continuous execution: Run 24/7, processing events as they arrive
  • Incremental updates: Process only new data, not full refreshes
  • Zero-latency: Transform data in-flight with millisecond latency
  • Stateful aggregations: Maintain running totals, windows, joins
  • Multiple targets: Write to streams, external streams, or tables
Materialized views act like triggers in traditional databases, but execute continuously on streaming data.

Basic Syntax

CREATE MATERIALIZED VIEW [IF NOT EXISTS] view_name 
INTO target_stream AS
SELECT ...
FROM source_stream
[WHERE ...]
[GROUP BY ...];
Components:
  • view_name: Unique identifier for the materialized view
  • INTO target_stream: Destination stream (must exist)
  • SELECT ... FROM: Continuous transformation query

Simple Examples

Filtering and Routing

-- Create target stream
CREATE STREAM high_value_events (
  user_id uint64,
  amount decimal(10,2),
  timestamp datetime64(3)
);

-- Filter and route high-value events
CREATE MATERIALIZED VIEW mv_high_value INTO high_value_events AS
SELECT 
  user_id,
  amount,
  timestamp
FROM transactions
WHERE amount > 1000;

Data Transformation

CREATE STREAM processed_logs (
  timestamp datetime64(3),
  level string,
  service string,
  message string
);

CREATE MATERIALIZED VIEW mv_parse_logs INTO processed_logs AS
SELECT 
  to_datetime64(raw:timestamp, 3) as timestamp,
  raw:level::string as level,
  raw:service::string as service,
  raw:message::string as message
FROM raw_logs
WHERE raw:level IN ('ERROR', 'WARN');

Enrichment

-- Join streaming events with dimension table
CREATE MATERIALIZED VIEW mv_enriched INTO enriched_events AS
SELECT 
  e.event_id,
  e.user_id,
  u.user_name,
  u.country,
  e.event_type,
  e.timestamp
FROM events e
LEFT JOIN table(users) u ON e.user_id = u.user_id;

Aggregations

Running Aggregates

CREATE STREAM metrics (
  service string,
  request_count uint64,
  error_count uint64,
  avg_latency float,
  timestamp datetime64(3)
);

-- Continuous aggregation by service
CREATE MATERIALIZED VIEW mv_metrics INTO metrics AS
SELECT 
  service,
  count() as request_count,
  count_if(status >= 500) as error_count,
  avg(latency_ms) as avg_latency,
  now64() as timestamp
FROM access_logs
GROUP BY service;
Aggregations in materialized views are incremental - each new batch of data updates the running totals.

Windowed Aggregations

-- 1-minute tumbling window aggregation
CREATE MATERIALIZED VIEW mv_window_metrics INTO window_metrics AS
SELECT 
  window_start,
  window_end,
  service,
  count() as request_count,
  avg(latency_ms) as avg_latency,
  max(latency_ms) as max_latency
FROM tumble(access_logs, 1m)
GROUP BY window_start, window_end, service;
See Windows for detailed windowing documentation.

ETL Pipelines

Kafka to Kafka ETL

From examples/ecommerce/etl.sql:
-- Read from source Kafka topic
CREATE EXTERNAL STREAM frontend_events(raw string)
SETTINGS 
  type='kafka',
  brokers='redpanda:9092',
  topic='owlshop-frontend-events';

-- Write to destination Kafka topic
CREATE EXTERNAL STREAM target(
  _tp_time datetime64(3), 
  url string, 
  method string, 
  ip string
) 
SETTINGS 
  type='kafka', 
  brokers='redpanda:9092', 
  topic='masked-fe-event', 
  data_format='JSONEachRow';

-- Real-time ETL with data masking
CREATE MATERIALIZED VIEW mv INTO target AS 
SELECT 
  now64() AS _tp_time, 
  raw:requestedUrl AS url, 
  raw:method AS method, 
  lower(hex(md5(raw:ipAddress))) AS ip  -- Hash PII
FROM frontend_events;
Pipeline flow:
  1. Materialized view starts consuming from frontend_events
  2. Each event is transformed (extract fields, hash IP)
  3. Results written to target external stream (Kafka)
  4. End-to-end latency: ~10ms

Stream to ClickHouse

-- External table pointing to ClickHouse
CREATE EXTERNAL TABLE ch_analytics
SETTINGS 
  type='clickhouse',
  address='clickhouse.example.com:9000',
  user='writer',
  password='secret',
  database='analytics',
  table='hourly_metrics';

-- Aggregate and write to ClickHouse
CREATE MATERIALIZED VIEW mv_to_ch INTO ch_analytics AS
SELECT 
  to_start_of_hour(timestamp) as hour,
  service,
  count() as total_requests,
  count_if(status >= 500) as errors,
  avg(latency_ms) as avg_latency
FROM access_logs
GROUP BY hour, service;

Change Data Capture (CDC)

Materialized views are perfect for processing CDC streams from databases.

Full CDC Example

From examples/cdc/cdc.sql:
-- External stream from Debezium
CREATE EXTERNAL STREAM customers_cdc(raw string)
SETTINGS 
  type='kafka',
  brokers='redpanda:9092',
  topic='dbserver1.inventory.customers';

-- Changelog stream to maintain current state
CREATE STREAM customers(
  id int, 
  first_name string, 
  last_name string, 
  email string
) 
PRIMARY KEY id 
SETTINGS mode='changelog_kv', version_column='_tp_time';

-- Process INSERT operations (op='c' for create)
CREATE MATERIALIZED VIEW mv_customers_c INTO customers AS 
SELECT 
  to_time(raw:payload.ts_ms) AS _tp_time,
  raw:payload.after.id::int AS id,
  raw:payload.after.first_name AS first_name,
  raw:payload.after.last_name AS last_name,
  raw:payload.after.email AS email,
  1::int8 as _tp_delta  -- +1 for insert
FROM customers_cdc 
WHERE raw:payload.op='c' 
SETTINGS seek_to='earliest';

-- Process UPDATE operations (op='u')
CREATE MATERIALIZED VIEW mv_customers_u INTO customers AS 
WITH cdc_changes AS (
  SELECT 
    ts_ms,
    array_join(changes) AS change,
    change.1 as val,
    change.2 AS _tp_delta
  FROM (
    SELECT 
      to_time(raw:payload.ts_ms) AS ts_ms,
      [(raw:payload.before, -1::int8), (raw:payload.after, 1::int8)] AS changes
    FROM customers_cdc
    WHERE raw:payload.op = 'u' 
    SETTINGS seek_to = 'earliest'
  )
)
SELECT 
  ts_ms AS _tp_time,
  val:id::int32 AS id,
  val:first_name AS first_name,
  val:last_name AS last_name,
  val:email AS email,
  _tp_delta
FROM cdc_changes;

-- Process DELETE operations (op='d')
CREATE MATERIALIZED VIEW mv_customers_d INTO customers AS 
SELECT 
  to_time(raw:payload.ts_ms) AS _tp_time,
  raw:payload.before.id::int AS id,
  raw:payload.before.first_name AS first_name,
  raw:payload.before.last_name AS last_name,
  raw:payload.before.email AS email,
  -1::int8 as _tp_delta  -- -1 for delete
FROM customers_cdc 
WHERE raw:payload.op='d' 
SETTINGS seek_to='earliest';
The _tp_delta column controls changelog semantics:
  • INSERT: Emit row with _tp_delta=1
  • UPDATE: Emit old row with _tp_delta=-1, then new row with _tp_delta=1
  • DELETE: Emit row with _tp_delta=-1
Proton’s changelog engine automatically maintains the current state by applying deltas.

Advanced Patterns

Multi-Target Views

-- Route to multiple destinations based on conditions

-- High-priority events to alerting system
CREATE MATERIALIZED VIEW mv_alerts INTO alert_stream AS
SELECT * FROM events WHERE priority = 'high';

-- All events to data lake
CREATE MATERIALIZED VIEW mv_archive INTO s3_sink AS
SELECT * FROM events;

-- Metrics to monitoring
CREATE MATERIALIZED VIEW mv_metrics INTO metrics_stream AS
SELECT 
  window_start,
  count() as event_count
FROM tumble(events, 1m)
GROUP BY window_start;

Deduplication

-- Deduplicate events based on unique ID
CREATE MATERIALIZED VIEW mv_dedup INTO unique_events AS
SELECT *
FROM (
  SELECT 
    *,
    row_number() OVER (PARTITION BY event_id ORDER BY _tp_time) as rn
  FROM raw_events
)
WHERE rn = 1;

Sessionization

-- Group events into sessions with 30-minute timeout
CREATE MATERIALIZED VIEW mv_sessions INTO user_sessions AS
SELECT 
  user_id,
  session_start(user_id, timestamp, INTERVAL 30 MINUTE) as session_id,
  min(timestamp) as session_start,
  max(timestamp) as session_end,
  count() as event_count
FROM user_events
GROUP BY user_id, session_id;

Stateful Processing

Materialized views maintain state for aggregations:
1

State storage

Aggregation state stored in memory or disk (depending on storage mode)
2

Incremental updates

New data merges with existing state using merge functions:
  • count(): Add to running total
  • sum(): Add to running sum
  • avg(): Update weighted average
  • min()/max(): Compare and update
3

State emission

Updated state emitted to target stream on each batch

State Size Management

-- Use windows to bound state size
CREATE MATERIALIZED VIEW mv_bounded INTO metrics AS
SELECT 
  window_start,
  device,
  avg(temperature) as avg_temp
FROM tumble(sensors, 5m)
GROUP BY window_start, device;
-- State cleared after each window
Unbounded aggregations (global GROUP BY without windows) accumulate state indefinitely. Use windowing or periodic cleanup for production.

Performance Considerations

Throughput

-- Efficient: Simple transformation
CREATE MATERIALIZED VIEW mv_fast INTO target AS
SELECT 
  user_id,
  event_type,
  timestamp
FROM source
WHERE event_type = 'purchase';

Memory Usage

Factors affecting memory:
  • Cardinality: Number of unique GROUP BY keys
  • State complexity: Simple counts vs. complex aggregations
  • Window size: Larger windows = more state
  • Target stream: Writing to external streams uses less memory than internal streams
Optimization tips:
  1. Use windowing to bound state
  2. Choose appropriate GROUP BY granularity
  3. Use emit_version() to control output frequency
  4. Consider pre-aggregation for very high cardinality

Latency

Typical end-to-end latencies:
  • Simple filter/transform: 1-5ms
  • Aggregation (low cardinality): 5-20ms
  • Aggregation (high cardinality): 20-100ms
  • Complex joins: 50-200ms

Monitoring

View Status

-- List all materialized views
SELECT * FROM system.materialized_views;

-- Check view health
SELECT 
  name,
  target_stream,
  status,
  last_exception
FROM system.materialized_views
WHERE database = 'default';

Metrics

-- View processing metrics
SELECT 
  view_name,
  rows_read,
  rows_written,
  bytes_read,
  elapsed_time
FROM system.query_log
WHERE query_kind = 'MaterializedView'
  AND event_time >= now() - INTERVAL 1 HOUR;

Management

Create or Replace

-- Atomic replacement (drops and recreates)
CREATE OR REPLACE MATERIALIZED VIEW mv_metrics INTO metrics AS
SELECT 
  service,
  count() as requests
FROM logs
GROUP BY service;

Pause and Resume

-- Pause materialized view
DETACH VIEW mv_metrics;

-- Resume materialized view  
ATTACH VIEW mv_metrics;

Drop View

-- Drop materialized view (target stream remains)
DROP VIEW mv_metrics;

-- Drop if exists
DROP VIEW IF EXISTS mv_metrics;
Dropping a materialized view does NOT drop the target stream. Data already written to the target stream is preserved.

Error Handling

Invalid Data

-- Use try_cast to handle malformed data
CREATE MATERIALIZED VIEW mv_safe INTO clean_data AS
SELECT 
  try_cast(raw:user_id as uint64) as user_id,
  try_cast(raw:amount as decimal(10,2)) as amount,
  timestamp
FROM raw_stream
WHERE user_id IS NOT NULL;  -- Filter out failed casts

Dead Letter Queue

-- Route failed records to DLQ
CREATE STREAM dlq (
  raw_data string,
  error_message string,
  timestamp datetime64(3)
);

CREATE MATERIALIZED VIEW mv_with_dlq INTO target AS
SELECT 
  try_cast(raw:id as uint64) as id,
  raw:data as data,
  if(id IS NULL, 
     (INSERT INTO dlq VALUES (raw, 'Invalid ID', now64())),
     null
  )
FROM source
WHERE id IS NOT NULL;

Best Practices

1

Design for incremental processing

Ensure transformations work on partial data batches, not full data sets.
2

Use appropriate windows

Bound state size with tumbling or hopping windows for aggregations.
3

Monitor performance

Track view latency and throughput metrics regularly.
4

Test with realistic data

Validate with production-like volumes before deploying.
5

Plan for failures

Implement error handling and dead letter queues for critical pipelines.

Comparison with Other Systems

FeatureProton MVksqlDBFlink SQLTraditional MV
ExecutionContinuousContinuousContinuousPeriodic refresh
LatencyMillisecondsSecondsSecondsMinutes/hours
StateIn-processRocksDBRocksDBDatabase
DependenciesNoneKafka, JVMFlink cluster, JVMDatabase
LanguageSQLSQLSQLSQL

Next Steps

Windows

Time-based aggregations and windowing

External Streams

Connect to Kafka and other sources

Query Syntax

Complete SQL syntax reference

CDC Tutorial

Change Data Capture patterns