CREATE EXTERNAL STREAM
Create an external stream to read from or write to Kafka:Basic Example
Authentication
SASL/PLAIN
SASL/SCRAM
Supported mechanisms:SCRAM-SHA-256, SCRAM-SHA-512
AWS MSK IAM Authentication
SSL/TLS Configuration
Configuration Options
Core Settings
| Setting | Type | Description | Required |
|---|---|---|---|
type | String | Must be ‘kafka’ or ‘redpanda’ | Yes |
brokers | String | Comma-separated list of brokers | Yes |
topic | String | Kafka topic name | Yes |
security_protocol | String | Protocol: plaintext, SASL_PLAINTEXT, SASL_SSL, SSL | No (default: plaintext) |
sasl_mechanism | String | SASL mechanism: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, AWS_MSK_IAM | No |
username | String | SASL username | No |
password | String | SASL password | No |
ssl_ca_cert_file | String | Path to CA certificate file | No |
ssl_ca_pem | String | CA certificate string (PEM format) | No |
skip_ssl_cert_check | Bool | Skip SSL certificate verification | No (default: false) |
Advanced Settings
| Setting | Type | Description | Default |
|---|---|---|---|
properties | String | Semicolon-separated Kafka client properties | Empty |
one_message_per_row | Bool | Produce one message per row for row-based formats | false |
data_format | String | Message format (e.g., JSONEachRow, Avro) | JSONEachRow |
poll_waittime_ms | UInt64 | Poll wait time in milliseconds | 500 |
consumer_stall_timeout_ms | Milliseconds | Consumer stall timeout | 60000 |
connection_timeout_ms | Milliseconds | Connection timeout | 10000 |
Custom Kafka Properties
Use theproperties setting to pass additional Kafka client configurations:
key1=value1;key2=value2;...
Reserved Virtual Columns
Proton provides virtual columns for Kafka metadata:| Column | Type | Description |
|---|---|---|
_tp_append_time | DateTime64(3, ‘UTC’) | Message append timestamp |
_tp_event_time | DateTime64(3, ‘UTC’) | Event timestamp |
_tp_process_time | DateTime64(3, ‘UTC’) | Processing timestamp |
_tp_shard | Int32 | Kafka partition ID |
_tp_sn | Int64 | Kafka offset (sequence number) |
_tp_message_key | String | Message key |
_tp_message_headers | Map(String, String) | Message headers |
Using Message Keys
Define_tp_message_key as a physical column to control message keys when writing:
Reading from Kafka
Streaming Query
Historical Query
Read Specific Partitions
Seek to Position
Writing to Kafka
INSERT Statement
Materialized View
Complete Example: Kafka to ClickHouse ETL
AWS MSK Example
Complete example with IAM authentication:Schema Registry Support
For Avro and Protobuf with schema registry:Best Practices
- Use SASL_SSL for production: Always encrypt data in transit
- Set appropriate timeouts: Adjust
consumer_stall_timeout_msbased on topic activity - Enable one_message_per_row: For row-based formats when you need per-row control
- Partition wisely: Use
_tp_message_keyorsharding_exprfor consistent partitioning - Monitor offsets: Query
_tp_snto track processing progress