Skip to main content
Timeplus Proton supports various types of joins for combining data from multiple streams and tables, with special semantics for streaming joins.

Join Types

Stream-to-Stream Joins

Join two streams together for real-time correlation:
SELECT 
  orders.order_id,
  orders.amount,
  payments.payment_method,
  payments.payment_time
FROM orders
INNER JOIN payments ON orders.order_id = payments.order_id;

Stream-to-Table Joins

Join a stream with historical data using the table() function:
SELECT 
  events.event_id,
  events.user_id,
  users.name,
  users.email
FROM events
LEFT JOIN table(users) ON events.user_id = users.user_id;

JOIN Syntax

INNER JOIN

Return only matching records from both sides:
SELECT 
  A.id,
  A.name,
  B.value
FROM left_stream A
INNER JOIN right_stream B ON A.id = B.id;

LEFT JOIN

Return all records from left stream, with matching records from right:
SELECT 
  A.id,
  A.name,
  B.value
FROM left_stream A
LEFT JOIN right_stream B ON A.id = B.id;
If no match exists in right_stream, columns from B will be NULL.

RIGHT JOIN

Return all records from right stream, with matching records from left:
SELECT 
  A.id,
  A.name,
  B.value
FROM left_stream A
RIGHT JOIN right_stream B ON A.id = B.id;

FULL OUTER JOIN

Return all records from both streams:
SELECT 
  A.id,
  A.name,
  B.id as b_id,
  B.type,
  B.value
FROM left_stream A
FULL OUTER JOIN right_stream B ON A.id = B.id;

Streaming Join Semantics

Time-based Correlation

Stream-to-stream joins correlate events based on event time:
-- Join orders with payments within same time window
SELECT 
  o.order_id,
  o.amount,
  p.payment_method
FROM orders o
INNER JOIN payments p ON o.order_id = p.order_id
WHERE o._tp_time BETWEEN p._tp_time - INTERVAL 5 MINUTE 
                     AND p._tp_time + INTERVAL 5 MINUTE;

Join State Management

Streaming joins maintain internal state:
  • Events from both streams are buffered
  • Matches are emitted as correlations are found
  • State is retained based on time or memory limits
  • Old events may be evicted from join state

Table Join for Enrichment

Static Reference Data

Enrich streams with static reference data:
-- Join stream with reference table
SELECT 
  orders.order_id,
  orders.product_id,
  products.name,
  products.category,
  orders.quantity
FROM orders
LEFT JOIN table(products) ON orders.product_id = products.product_id;

Dictionary Lookups

Join with dictionaries for fast enrichment:
SELECT 
  events.event_id,
  events.country_code,
  countries.country_name,
  countries.region
FROM events
LEFT JOIN country_dict AS countries 
  ON events.country_code = countries.code;

Complex Join Conditions

Multiple Join Keys

SELECT 
  A.user_id,
  A.session_id,
  B.event_type
FROM sessions A
INNER JOIN events B 
  ON A.user_id = B.user_id 
  AND A.session_id = B.session_id;

Additional Filter Conditions

SELECT 
  A.name,
  B.value
FROM left_stream A
LEFT JOIN right_stream B 
  ON A.department_id = B.department_id
WHERE B.value > 100;  -- Additional filter

Window Joins

Joining Windowed Aggregations

SELECT 
  orders_agg.window_start,
  orders_agg.order_count,
  payments_agg.payment_count
FROM (
  SELECT 
    window_start,
    window_end,
    count() as order_count
  FROM tumble(orders, order_time, INTERVAL 1 MINUTE)
  GROUP BY window_start, window_end
) AS orders_agg
INNER JOIN (
  SELECT 
    window_start,
    window_end,
    count() as payment_count
  FROM tumble(payments, payment_time, INTERVAL 1 MINUTE)
  GROUP BY window_start, window_end
) AS payments_agg
  ON orders_agg.window_start = payments_agg.window_start
  AND orders_agg.window_end = payments_agg.window_end;

Join with table() for Historical Window Analysis

-- Find windows with highest activity
WITH windowed_data AS (
  SELECT 
    window_start,
    window_end,
    count(*) as event_count
  FROM tumble(table(events), _tp_time, INTERVAL 5 MINUTE)
  GROUP BY window_start, window_end
)
SELECT 
  w1.window_start,
  w1.event_count,
  w2.max_count
FROM windowed_data w1
INNER JOIN (
  SELECT max(event_count) as max_count
  FROM windowed_data
) w2 ON w1.event_count = w2.max_count;

Subquery Joins

Filtered Subquery Join

SELECT 
  A.id,
  A.name,
  B.value
FROM left_stream A
LEFT JOIN (
  SELECT id, value 
  FROM right_stream 
  WHERE value LIKE 'X%'
) B ON A.id = B.id;

Aggregated Subquery Join

SELECT 
  live.device_id,
  live.temperature,
  hist.avg_temp
FROM sensor_readings AS live
LEFT JOIN (
  SELECT 
    device_id,
    avg(temperature) as avg_temp
  FROM table(sensor_readings)
  WHERE _tp_time > now() - INTERVAL 24 HOUR
  GROUP BY device_id
) AS hist ON live.device_id = hist.device_id
WHERE live.temperature > hist.avg_temp * 1.5;

Performance Considerations

Join Settings

Control join behavior with settings:
SELECT 
  A.id,
  B.value
FROM left_stream A
LEFT JOIN right_stream B ON A.id = B.id
SETTINGS 
  max_threads = 8,
  default_hash_join = 'hybrid';

Seek to Earliest

Process historical data in joins:
SELECT 
  A.id,
  A.name,
  B.value
FROM left_stream A
LEFT JOIN right_stream B ON A.id = B.id
SETTINGS seek_to = 'earliest';

Complete Join Examples

Real-time Order Payment Correlation

-- Correlate orders with payments in real-time
SELECT 
  o.order_id,
  o.customer_id,
  o.amount as order_amount,
  p.payment_method,
  p.amount as payment_amount,
  p.payment_time
FROM orders o
INNER JOIN payments p 
  ON o.order_id = p.order_id
WHERE o.amount = p.amount;  -- Validate amounts match

User Activity Enrichment

-- Enrich events with user profile data
SELECT 
  e.event_id,
  e.event_type,
  e.event_time,
  u.user_name,
  u.user_tier,
  u.country
FROM user_events e
LEFT JOIN table(user_profiles) u 
  ON e.user_id = u.user_id;

Multi-Stream Correlation

-- Correlate bids with auctions
SELECT 
  b.bid_id,
  b.auction_id,
  b.bidder_id,
  b.bid_amount,
  a.auction_name,
  a.starting_price,
  CASE 
    WHEN b.bid_amount > a.starting_price * 2 THEN 'high'
    WHEN b.bid_amount > a.starting_price THEN 'normal'
    ELSE 'low'
  END as bid_category
FROM bids b
INNER JOIN auctions a ON b.auction_id = a.auction_id;

Window-based Join Analysis

-- Compare auction bids to maximum bids per window
WITH auction_bids AS (
  SELECT 
    auction_id,
    count(*) as num_bids,
    window_start,
    window_end
  FROM hop(bids, bid_time, INTERVAL 2 SECOND, INTERVAL 10 SECOND)
  GROUP BY auction_id, window_start, window_end
),
max_bids AS (
  SELECT 
    max(num_bids) as max_num,
    window_start,
    window_end
  FROM auction_bids
  GROUP BY window_start, window_end
)
SELECT 
  ab.auction_id,
  ab.num_bids
FROM auction_bids ab
INNER JOIN max_bids mb 
  ON ab.window_start = mb.window_start 
  AND ab.window_end = mb.window_end
WHERE ab.num_bids >= mb.max_num;

Best Practices

  • Use INNER JOIN when you only need matching records
  • Use LEFT JOIN when left stream is primary and right is optional
  • Use table() function for joining with static reference data
  • Join on indexed columns when possible
  • Use equality conditions for best performance
  • Consider data distribution when choosing join keys
Stream-to-stream joins maintain state:
  • Consider memory implications for long-running joins
  • Use time-based filters to limit state retention
  • Monitor join state size in production
When correlating time-series data:
-- Join within same time window
SELECT * FROM 
  tumble(stream_a, time_a, INTERVAL 1 MINUTE) a
JOIN 
  tumble(stream_b, time_b, INTERVAL 1 MINUTE) b
ON a.window_start = b.window_start;

Next Steps

Time Windows

Use time-based windows with joins

Aggregations

Aggregate joined data

Materialized Views

Materialize join results