Skip to main content

External Streams

External streams allow Timeplus Proton to read from and write to external systems like Kafka, Pulsar, Redpanda, and HTTP endpoints without data duplication. They act as virtual streams that proxy to external sources.

Overview

Key characteristics:
  • Zero-copy integration with external systems
  • No data stored in Proton (data stays in source system)
  • Bi-directional: read and write support
  • Schema on read: define columns to extract
  • Multiple formats: JSON, Avro, Protobuf, CSV, etc.
External streams are perfect for building real-time ETL pipelines without data movement.

Kafka / Redpanda

Most common external stream type for production workloads.

Basic Connection

CREATE EXTERNAL STREAM kafka_events (
  raw string  -- Single column for raw message
)
SETTINGS 
  type='kafka',
  brokers='localhost:9092',
  topic='events';

Structured Schema

CREATE EXTERNAL STREAM user_events (
  user_id uint64,
  event_type string,
  timestamp datetime64(3),
  properties string
)
SETTINGS 
  type='kafka',
  brokers='kafka-1:9092,kafka-2:9092,kafka-3:9092',
  topic='user-events',
  data_format='JSONEachRow';

Reading from Kafka

-- Scan incoming events (streaming)
SELECT * FROM kafka_events;

-- Filter and transform
SELECT 
  user_id,
  event_type,
  to_string(timestamp) as event_time
FROM user_events
WHERE event_type = 'purchase';

-- Live aggregation
SELECT 
  event_type,
  count() as total,
  count(DISTINCT user_id) as unique_users
FROM user_events
GROUP BY event_type;
-- Start from earliest messages
SELECT * FROM kafka_events SETTINGS seek_to='earliest';

-- Start from latest (default)
SELECT * FROM kafka_events SETTINGS seek_to='latest';

-- Start from specific timestamp
SELECT * FROM kafka_events 
SETTINGS seek_to='1640000000000';  -- Unix timestamp in ms

Writing to Kafka

-- Create output external stream
CREATE EXTERNAL STREAM kafka_output (
  user_id uint64,
  event_type string,
  processed_time datetime64(3)
)
SETTINGS 
  type='kafka',
  brokers='localhost:9092',
  topic='processed-events',
  data_format='JSONEachRow';

-- Direct insert
INSERT INTO kafka_output 
VALUES (12345, 'click', now64());

-- Continuous ETL via materialized view
CREATE MATERIALIZED VIEW mv_process INTO kafka_output AS
SELECT 
  user_id,
  event_type,
  now64() as processed_time
FROM kafka_events
WHERE event_type IN ('click', 'purchase');

Authentication

CREATE EXTERNAL STREAM secure_kafka(raw string)
SETTINGS 
  type='kafka',
  brokers='kafka.example.com:9093',
  topic='secure-topic',
  security_protocol='SASL_SSL',
  sasl_mechanism='PLAIN',
  username='my-user',
  password='my-password';
CREATE EXTERNAL STREAM secure_kafka(raw string)
SETTINGS 
  type='kafka',
  brokers='kafka.example.com:9093',
  topic='secure-topic',
  security_protocol='SASL_SSL',
  sasl_mechanism='SCRAM-SHA-256',
  username='my-user',
  password='my-password';
CREATE EXTERNAL STREAM msk_stream(raw string)
SETTINGS 
  type='kafka',
  brokers='b-1.msk.us-west-2.amazonaws.com:9098',
  topic='my-topic',
  security_protocol='SASL_SSL',
  sasl_mechanism='AWS_MSK_IAM';
CREATE EXTERNAL STREAM ssl_kafka(raw string)
SETTINGS 
  type='kafka',
  brokers='kafka.example.com:9093',
  topic='secure-topic',
  security_protocol='SSL',
  ssl_ca_cert_file='/path/to/ca-cert.pem';

-- Or provide CA cert inline
CREATE EXTERNAL STREAM ssl_kafka2(raw string)
SETTINGS 
  type='kafka',
  brokers='kafka.example.com:9093',
  topic='secure-topic',
  security_protocol='SSL',
  ssl_ca_pem='-----BEGIN CERTIFICATE-----\n...';

-- Skip certificate verification (not recommended for production)
CREATE EXTERNAL STREAM ssl_kafka3(raw string)
SETTINGS 
  type='kafka',
  brokers='kafka.example.com:9093',
  topic='secure-topic',
  security_protocol='SSL',
  skip_ssl_cert_check=true;

Data Formats

Proton supports multiple serialization formats:
-- JSONEachRow: one JSON object per line
CREATE EXTERNAL STREAM json_stream (
  id uint64,
  name string,
  timestamp datetime64(3)
)
SETTINGS 
  type='kafka',
  brokers='localhost:9092',
  topic='json-topic',
  data_format='JSONEachRow';

Advanced Configuration

CREATE EXTERNAL STREAM advanced_kafka (
  user_id uint64,
  data string
)
SETTINGS 
  type='kafka',
  brokers='localhost:9092',
  topic='events',
  data_format='JSONEachRow',
  -- Advanced Kafka properties (semicolon-separated)
  properties='max.poll.interval.ms=300000;session.timeout.ms=45000;fetch.min.bytes=1024',
  -- Consumer behavior
  poll_waittime_ms=500,  -- Poll timeout
  consumer_stall_timeout_ms=60000,  -- Recreate consumer if stalled
  -- Sharding for writes
  sharding_expr='user_id',  -- Partition by user_id
  one_message_per_row=true;  -- One Kafka message per row
From source code (ExternalStreamSettings.h):
// Key configuration options
M(String, brokers, "", "Comma-separated list of brokers", 0)
M(String, topic, "", "Kafka topic name", 0)
M(String, security_protocol, "plaintext", "Connection protocol", 0)
M(String, sasl_mechanism, "", "PLAIN, SCRAM-SHA-256, SCRAM-SHA-512", 0)
M(UInt64, poll_waittime_ms, 500, "Poll wait time in ms", 0)
M(Milliseconds, consumer_stall_timeout_ms, 60000, "Consumer stall timeout", 0)

Apache Pulsar

Basic Connection

CREATE EXTERNAL STREAM pulsar_stream (
  user_id uint64,
  event_type string,
  data string
)
SETTINGS 
  type='pulsar',
  service_url='pulsar://localhost:6650',
  topic='persistent://public/default/events',
  data_format='JSONEachRow';

Pulsar Authentication

CREATE EXTERNAL STREAM pulsar_jwt(raw string)
SETTINGS 
  type='pulsar',
  service_url='pulsar+ssl://pulsar.example.com:6651',
  topic='persistent://tenant/namespace/topic',
  jwt='eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...';

Pulsar Configuration

CREATE EXTERNAL STREAM pulsar_advanced (
  payload string
)
SETTINGS 
  type='pulsar',
  service_url='pulsar://localhost:6650',
  topic='persistent://public/default/events',
  connections_per_broker=5,  -- Increase for better throughput
  io_threads=4,  -- Parallel IO threads
  memory_limit=1073741824;  -- 1GB memory limit

HTTP Streams

Read from HTTP Endpoint

CREATE EXTERNAL STREAM http_source (
  id uint64,
  value string
)
SETTINGS 
  type='http',
  url='https://api.example.com/events',
  read_method='GET',
  data_format='JSONEachRow';

Write to HTTP Endpoint

CREATE EXTERNAL STREAM http_sink (
  event_id uint64,
  event_data string,
  timestamp datetime64(3)
)
SETTINGS 
  type='http',
  url='https://webhook.example.com/ingest',
  write_method='POST',
  data_format='JSONEachRow',
  compression_method='gzip',  -- Compress payload
  use_chunked_encoding=true;  -- Chunked transfer

HTTP Authentication

-- Basic auth via headers
CREATE EXTERNAL STREAM http_auth (
  data string
)
SETTINGS 
  type='http',
  url='https://api.example.com/stream',
  http_headers='Authorization: Bearer token123,X-API-Key: key456';

NATS JetStream

CREATE EXTERNAL STREAM nats_stream (
  subject string,
  data string,
  timestamp datetime64(3)
)
SETTINGS 
  type='nats',
  brokers='nats://localhost:4222',
  stream_name='EVENTS',  -- JetStream stream
  subject='events.>',  -- Subject filter with wildcard
  consumer_name='proton-consumer',
  durable=true,
  ack_policy='explicit',
  deliver_policy='all',
  batch_size=256,
  fetch_timeout_ms=5000;

Timeplus Cloud / Enterprise

Connect to another Timeplus instance:
CREATE EXTERNAL STREAM remote_timeplus (
  user_id uint64,
  event_type string,
  timestamp datetime64(3)
)
SETTINGS 
  type='timeplus',
  hosts='timeplus.example.com:8463',
  db='analytics',
  stream='events',
  user='readonly',
  password='secret',
  secure=true;

Real-World Examples

Kafka ETL Pipeline

From examples/ecommerce/etl.sql:
-- Read raw events from Kafka
CREATE EXTERNAL STREAM frontend_events(raw string)
SETTINGS 
  type='kafka',
  brokers='redpanda:9092',
  topic='owlshop-frontend-events';

-- Write processed events to another topic
CREATE EXTERNAL STREAM target(
  _tp_time datetime64(3), 
  url string, 
  method string, 
  ip string
) 
SETTINGS 
  type='kafka', 
  brokers='redpanda:9092', 
  topic='masked-fe-event', 
  data_format='JSONEachRow';

-- ETL pipeline with data masking
CREATE MATERIALIZED VIEW mv INTO target AS 
SELECT 
  now64() AS _tp_time, 
  raw:requestedUrl AS url, 
  raw:method AS method, 
  lower(hex(md5(raw:ipAddress))) AS ip  -- Hash IP address
FROM frontend_events;

CDC Replication

From examples/cdc/cdc.sql:
-- External stream for Debezium CDC
CREATE EXTERNAL STREAM customers_cdc(raw string)
SETTINGS 
  type='kafka',
  brokers='redpanda:9092',
  topic='dbserver1.inventory.customers';

-- Changelog stream to maintain current state
CREATE STREAM customers(
  id int, 
  first_name string, 
  last_name string, 
  email string
) 
PRIMARY KEY id 
SETTINGS mode='changelog_kv', version_column='_tp_time';

-- Process INSERT events
CREATE MATERIALIZED VIEW mv_customers_c INTO customers AS 
SELECT 
  to_time(raw:payload.ts_ms) AS _tp_time,
  raw:payload.after.id::int AS id,
  raw:payload.after.first_name AS first_name,
  raw:payload.after.last_name AS last_name,
  raw:payload.after.email AS email,
  1::int8 as _tp_delta
FROM customers_cdc 
WHERE raw:payload.op='c' 
SETTINGS seek_to='earliest';

-- Process UPDATE events (emit old and new)
CREATE MATERIALIZED VIEW mv_customers_u INTO customers AS 
WITH cdc_changes AS (
  SELECT 
    to_time(raw:payload.ts_ms) AS ts_ms,
    [(raw:payload.before, -1::int8), (raw:payload.after, 1::int8)] AS changes
  FROM customers_cdc
  WHERE raw:payload.op = 'u' 
  SETTINGS seek_to = 'earliest'
)
SELECT 
  ts_ms AS _tp_time,
  array_join(changes).1:id::int AS id,
  array_join(changes).1:first_name AS first_name,
  array_join(changes).1:last_name AS last_name,
  array_join(changes).1:email AS email,
  array_join(changes).2 AS _tp_delta
FROM cdc_changes;

-- Process DELETE events
CREATE MATERIALIZED VIEW mv_customers_d INTO customers AS 
SELECT 
  to_time(raw:payload.ts_ms) AS _tp_time,
  raw:payload.before.id::int AS id,
  raw:payload.before.first_name AS first_name,
  raw:payload.before.last_name AS last_name,
  raw:payload.before.email AS email,
  -1::int8 as _tp_delta
FROM customers_cdc 
WHERE raw:payload.op='d' 
SETTINGS seek_to='earliest';

Performance Tuning

1

Batch size

-- Increase batch size for higher throughput
SETTINGS 
  properties='fetch.min.bytes=65536;fetch.max.wait.ms=500'
2

Parallel consumers

-- Use multiple shards for parallel consumption
CREATE EXTERNAL STREAM kafka_parallel(raw string)
SETTINGS 
  type='kafka',
  brokers='localhost:9092',
  topic='high-volume',
  properties='max.poll.records=1000';
3

Compression

-- Enable compression for writes
SETTINGS 
  compression_method='gzip',  -- or 'lz4', 'snappy'
  properties='compression.type=lz4'

Troubleshooting

-- Check consumer lag
SELECT * FROM system.kafka_consumers;

-- Adjust stall timeout
ALTER STREAM kafka_events 
MODIFY SETTING consumer_stall_timeout_ms=120000;
-- Increase connection timeout
CREATE EXTERNAL STREAM kafka_slow(raw string)
SETTINGS 
  type='kafka',
  brokers='slow-kafka:9092',
  topic='events',
  connection_timeout_ms=30000;  -- 30 seconds
-- Use raw string and parse manually
CREATE EXTERNAL STREAM kafka_raw(raw string)
SETTINGS 
  type='kafka',
  brokers='localhost:9092',
  topic='events';

-- Parse with error handling
SELECT 
  raw,
  try_cast(raw:id as uint64) as id,
  raw:name::string as name
FROM kafka_raw;

Next Steps

Materialized Views

Build streaming ETL pipelines

Data Formats

Supported serialization formats

Kafka Tutorial

End-to-end Kafka integration

CDC Tutorial

Change Data Capture patterns