CREATE CHANGEFEED is an enterprise-only feature. For the core version, see EXPERIMENTAL CHANGEFEED FOR.
The CREATE CHANGEFEED statement creates a new enterprise changefeed, which targets an allowlist of tables, called "watched rows".  Every change to a watched row is emitted as a record in a configurable format (JSON or Avro) to a configurable sink (Kafka or a cloud storage sink). You can create, pause, resume, or cancel an enterprise changefeed.
For more information, see Change Data Capture.
Required privileges
Changefeeds can only be created by superusers, i.e., members of the admin role. The admin role exists by default with root as the member.
Considerations
- In most cases, each version of a row will be emitted once. However, some infrequent conditions (e.g., node failures, network partitions) will cause them to be repeated. This gives our changefeeds an at-least-once delivery guarantee. For more information, see Change Data Capture - Ordering Guarantees.
Synopsis
Parameters
| Parameter | Description | 
|---|---|
| table_name | The name of the table (or tables in a comma separated list) to create a changefeed for. Note: Changefeeds do not share internal buffers, so each running changefeed will increase total memory usage. To watch multiple tables, we recommend creating a changefeed with a comma-separated list of tables. | 
| sink | The location of the configurable sink. The scheme of the URI indicates the type. For more information, see Sink URI below. | 
| option/value | For a list of available options and their values, see Options below. | 
Sink URI
The sink URI follows the basic format of:
'{scheme}://{host}:{port}?{query_parameters}'
| URI Component | Description | 
|---|---|
| scheme | The type of sink: kafkaor any cloud storage sink. | 
| host | The sink's hostname or IP address. | 
| port | The sink's port. | 
| query_parameters | The sink's query parameters. | 
Kafka
Example of a Kafka sink URI:
'kafka://broker.address.com:9092?topic_prefix=bar_&tls_enabled=true&ca_cert=LS0tLS1CRUdJTiBDRVJUSUZ&sasl_enabled=true&sasl_user=petee&sasl_password=bones'
Cloud storage sink
Use a cloud storage sink to deliver changefeed data to OLAP or big data systems without requiring transport via Kafka.
Currently, cloud storage sinks only work with JSON and emits newline-delimited JSON files.
Any of the cloud storages below can be used as a sink:
The scheme for a cloud storage sink should be prepended with experimental-.
[scheme]://[host]/[path]?[parameters]
| Location | Scheme | Host | Parameters | 
|---|---|---|---|
| Amazon | s3 | Bucket name | AUTH1 (optional; can beimplicitorspecified),AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY,AWS_SESSION_TOKEN | 
| Azure | azure | N/A (see Example file URLs | AZURE_ACCOUNT_KEY,AZURE_ACCOUNT_NAME | 
| Google Cloud 2 | gs | Bucket name | AUTH(optional; can bedefault,implicit, orspecified),CREDENTIALS | 
| HTTP 3 | http | Remote host | N/A | 
| NFS/Local 4 | nodelocal | nodeIDorself5 (see Example file URLs) | N/A | 
| S3-compatible services 6 | s3 | Bucket name | AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY,AWS_SESSION_TOKEN,AWS_REGION7 (optional),AWS_ENDPOINT | 
The location parameters often contain special characters that need to be URI-encoded. Use Javascript's encodeURIComponent function or Go language's url.QueryEscape function to URI-encode the parameters. Other languages provide similar functions to URI-encode special characters.
If your environment requires an HTTP or HTTPS proxy server for outgoing connections, you can set the standard HTTP_PROXY and HTTPS_PROXY environment variables when starting CockroachDB.
If you cannot run a full proxy, you can disable external HTTP(S) access (as well as custom HTTP(S) endpoints) when performing bulk operations (e.g., BACKUP, RESTORE, etc.) by using the --external-io-disable-http flag. You can also disable the use of implicit credentials when accessing external cloud storage services for various bulk operations by using the --external-io-disable-implicit-credentials flag.
- 1 If the - AUTHparameter is not provided, AWS connections default to- specifiedand the access keys must be provided in the URI parameters. If the- AUTHparameter is- implicit, the access keys can be omitted and the credentials will be loaded from the environment.
- 2 If the - AUTHparameter is not specified, the- cloudstorage.gs.default.keycluster setting will be used if it is non-empty, otherwise the- implicitbehavior is used. If the- AUTHparameter is- implicit, all GCS connections use Google's default authentication strategy. If the- AUTHparameter is- default, the- cloudstorage.gs.default.keycluster setting must be set to the contents of a service account file which will be used during authentication. If the- AUTHparameter is- specified, GCS connections are authenticated on a per-statement basis, which allows the JSON key object to be sent in the- CREDENTIALSparameter. The JSON key object should be Base64-encoded (using the standard encoding in RFC 4648).
- 3 You can create your own HTTP server with Caddy or nginx. A custom root CA can be appended to the system's default CAs by setting the - cloudstorage.http.custom_cacluster setting, which will be used when verifying certificates from HTTPS URLs.
- 4 The file system backup location on the NFS drive is relative to the path specified by the - --external-io-dirflag set while starting the node. If the flag is set to- disabled, then imports from local directories and NFS drives are disabled.
- 5 Using a - nodeIDis required and the data files will be in the- externdirectory of the specified node. In most cases (including single-node clusters), using- nodelocal://1/<path>is sufficient. Use- selfif you do not want to specify a- nodeID, and the individual data files will be in the- externdirectories of arbitrary nodes; however, to work correctly, each node must have the- --external-io-dirflag point to the same NFS mount or other network-backed, shared storage.
- 6 A custom root CA can be appended to the system's default CAs by setting the - cloudstorage.http.custom_cacluster setting, which will be used when verifying certificates from an S3-compatible service.
- 7 The - AWS_REGIONparameter is optional since it is not a required parameter for most S3-compatible services. Specify the parameter only if your S3-compatible service requires it.
Example file URLs
| Location | Example | 
|---|---|
| Amazon S3 | s3://acme-co/employees?AWS_ACCESS_KEY_ID=123&AWS_SECRET_ACCESS_KEY=456 | 
| Azure | azure://employees?AZURE_ACCOUNT_KEY=123&AZURE_ACCOUNT_NAME=acme-co | 
| Google Cloud | gs://acme-co | 
| HTTP | http://localhost:8080/employees | 
| NFS/Local | nodelocal://1/path/employees,nodelocal://self/nfsmount/backups/employees5 | 
Query parameters
Parameters should always be URI-encoded before they are included the changefeed's URI, as they often contain special characters. Use Javascript's encodeURIComponent function or Go language's url.QueryEscape function to URI-encode the parameters. Other languages provide similar functions to URI-encode special characters.
Query parameters include:
| Parameter | Sink Type | Description | 
|---|---|---|
| topic_prefix | Kafka, cloud | Type: STRINGAdds a prefix to all topic names. For example, CREATE CHANGEFEED FOR TABLE foo INTO 'kafka://...?topic_prefix=bar_'would emit rows under the topicbar_fooinstead offoo. | 
| tls_enabled=true | Kafka | Type: BOOLIf true, enable Transport Layer Security (TLS) on the connection to Kafka. This can be used with aca_cert(see below). | 
| ca_cert | Kafka | Type: STRINGThe base64-encoded ca_certfile.Note: To encode your ca.cert, runbase64 -w 0 ca.cert. | 
| client_cert | Kafka | Type: STRINGThe base64-encoded Privacy Enhanced Mail (PEM) certificate. This is used with client_key. | 
| client_key | Kafka | Type: STRINGThe base64-encoded private key for the PEM certificate. This is used with client_cert. | 
| sasl_enabled | Kafka | Type: BOOLIf true, use SASL/PLAIN to authenticate. This requires asasl_userandsasl_password(see below). | 
| sasl_user | Kafka | Type: STRINGYour SASL username. | 
| sasl_password | Kafka | Type: STRINGYour SASL password. | 
| file_size | cloud | Type: STRINGThe file will be flushed (i.e., written to the sink) when it exceeds the specified file size. This can be used with the WITH resolvedoption, which flushes on a specified cadence.Default: 16MB | 
Options
| Option | Value | Description | 
|---|---|---|
| updated | N/A | Include updated timestamps with each row. If a cursoris provided, the "updated" timestamps will match the MVCC timestamps of the emitted rows, and there is no initial scan. If acursoris not provided, the changefeed will perform an initial scan (as of the time the changefeed was created), and the "updated" timestamp for each change record emitted in the initial scan will be the timestamp of the initial scan. Similarly, when a backfill is performed for a schema change, the "updated" timestamp is set to the first timestamp for when the new schema is valid. | 
| resolved | INTERVAL | Periodically emit resolved timestamps to the changefeed. Optionally, set a minimum duration between emitting resolved timestamps. If unspecified, all resolved timestamps are emitted. Example: resolved='10s' | 
| envelope | key_only/wrapped | Use key_onlyto emit only the key and no value, which is faster if you only want to know when the key changes.Default: envelope=wrapped | 
| cursor | Timestamp | Emit any changes after the given timestamp, but does not output the current state of the table first. If cursoris not specified, the changefeed starts by doing an initial scan of all the watched rows and emits the current value, then moves to emitting any changes that happen after the scan.When starting a changefeed at a specific cursor, thecursorcannot be before the configured garbage collection window (seegc.ttlseconds) for the table you're trying to follow; otherwise, the changefeed will error. With default garbage collection settings, this means you cannot create a changefeed that starts more than 25 hours in the past.cursorcan be used to start a new changefeed where a previous changefeed ended.Example: CURSOR='1536242855577149065.0000000000' | 
| format | json/experimental_avro | Format of the emitted record. Currently, support for Avro is limited and experimental. For mappings of CockroachDB types to Avro types, see the table below. Default: format=json. | 
| confluent_schema_registry | Schema Registry address | The Schema Registry address is required to use experimental_avro. | 
| key_in_value | N/A | Make the primary key of a deleted row recoverable in sinks where each message has a value but not a key (most have a key and value in each message). key_in_valueis automatically used for these sinks (currently only cloud storage sinks). | 
| diff | N/A | New in v20.1: Publish a beforefield with each message, which includes the value of the row before the update was applied. | 
| compression | gzip | New in v20.1: Compress changefeed data files written to a cloud storage sink. Currently, only Gzip is supported for compression. | 
| protect_data_from_gc_on_pause | N/A | New in v20.1: When a changefeed is paused, ensure that the data needed to resume the changefeed is not garbage collected. Note: If you use this option, changefeeds left paused can prevent garbage collection for long periods of time. | 
| schema_change_events | default/column_changes | New in v20.1: The type of schema change event that triggers the behavior specified by the schema_change_policyoption:
 Default: schema_change_events=default | 
| schema_change_policy | backfill/nobackfill/stop | The behavior to take when an event specified by the schema_change_eventsoption occurs:
 Default: schema_change_policy=backfill | 
| initial_scan/no_initial_scan | N/A | New in v20.1: Control whether or not an initial scan will occur at the start time of a changefeed. initial_scanandno_initial_scancannot be used simultaneously. If neitherinitial_scannorno_initial_scanis specified, an initial scan will occur if there is nocursor, and will not occur if there is one. This preserves the behavior from previous releases.Default: initial_scanIf used in conjunction with cursor, an initial scan will be performed at the cursor timestamp. If nocursoris specified, the initial scan is performed atnow(). | 
New in v20.1: Using the format=experimental_avro, envelope=key_only, and updated options together is rejected. envelope=key_only prevents any rows with updated fields from being emitted, which makes the updated option meaningless.
Avro limitations
Currently, support for Avro is limited and experimental. Below is a list of unsupported SQL types and values for Avro changefeeds:
- Decimals must have precision specified.
- Decimals with - NaNor infinite values cannot be written in Avro.Note:- To avoid - NaNor infinite values, add a- CHECKconstraint to prevent these values from being inserted into decimal columns.
- TIME,- DATE,- INTERVAL,- UUID,- INET,- ARRAY,- JSONB,- BIT, and collated- STRINGare not supported in Avro yet.
Avro types
Below is a mapping of CockroachDB types to Avro types:
| CockroachDB Type | Avro Type | Avro Logical Type | 
|---|---|---|
| INT | LONG | |
| BOOL | BOOLEAN | |
| FLOAT | DOUBLE | |
| STRING | STRING | |
| DATE | INT | DATE | 
| TIME | LONG | TIME-MICROS | 
| TIMESTAMP | LONG | TIME-MICROS | 
| TIMESTAMPTZ | LONG | TIME-MICROS | 
| DECIMAL | BYTES | DECIMAL | 
| UUID | STRING | |
| INET | STRING | |
| JSONB | STRING | 
Responses
Messages
The messages (i.e., keys and values) emitted to a Kafka topic are specific to the envelope. The default format is wrapped, and the output messages are composed of the following:
- Key: An array always composed of the row's PRIMARY KEYfield(s) (e.g.,[1]forJSONor{"id":{"long":1}}for Avro).
- Value:
- One of three possible top-level fields:
- after, which contains the state of the row after the update (or- null' for- DELETEs).
- updated, which contains the updated timestamp.
- resolved, which is emitted for records representing resolved timestamps. These records do not include an "after" value since they only function as checkpoints.
 
- For INSERTandUPDATE, the current state of the row inserted or updated.
- For DELETE,null.
 
- One of three possible top-level fields:
For example:
| Statement | Response | 
|---|---|
| INSERT INTO office_dogs VALUES (1, 'Petee'); | JSON: [1] {"after": {"id": 1, "name": "Petee"}}Avro:{"id":{"long":1}}    {"after":{"office_dogs":{"id":{"long":1},"name":{"string":"Petee"}}}} | 
| DELETE FROM office_dogs WHERE name = 'Petee' | JSON: [1] {"after": null}Avro:{"id":{"long":1}}  {"after":null} | 
Files
The files emitted to a sink use the following naming conventions:
The timestamp format is YYYYMMDDHHMMSSNNNNNNNNNLLLLLLLLLL.
General file format
/[date]/[timestamp]-[uniquer]-[topic]-[schema-id]
For example:
/2020-04-02/202004022058072107140000000000000-56087568dba1e6b8-1-72-00000000-test_table-1.ndjson
Resolved file format
/[date]/[timestamp].RESOLVED
For example:
/2020-04-04/202004042351304139680000000000000.RESOLVED
Examples
Create a changefeed connected to Kafka
> CREATE CHANGEFEED FOR TABLE name, name2, name3
  INTO 'kafka://host:port'
  WITH updated, resolved;
+--------------------+
|       job_id       |
+--------------------+
| 360645287206223873 |
+--------------------+
(1 row)
For more information on how to create a changefeed connected to Kafka, see Change Data Capture.
Create a changefeed connected to Kafka using Avro
> CREATE CHANGEFEED FOR TABLE name, name2, name3
  INTO 'kafka://host:port'
  WITH format = experimental_avro, confluent_schema_registry = <schema_registry_address>;
+--------------------+
|       job_id       |
+--------------------+
| 360645287206223873 |
+--------------------+
(1 row)
For more information on how to create a changefeed that emits an Avro record, see Change Data Capture.
Create a changefeed connected to a cloud storage sink
This is an experimental feature. The interface and output are subject to change.
> CREATE CHANGEFEED FOR TABLE name, name2, name3
  INTO 'experimental-scheme://host?parameters'
  WITH updated, resolved;
+--------------------+
|       job_id       |
+--------------------+
| 360645287206223873 |
+--------------------+
(1 row)
For more information on how to create a changefeed connected to a cloud storage sink, see Change Data Capture.
Manage a changefeed
Use the following SQL statements to pause, resume, and cancel a changefeed.
Changefeed-specific SQL statements (e.g., CANCEL CHANGEFEED) will be added in the future.
Pause a changefeed
> PAUSE JOB job_id;
For more information, see PAUSE JOB.
Resume a paused changefeed
> RESUME JOB job_id;
For more information, see RESUME JOB.
Cancel a changefeed
> CANCEL JOB job_id;
For more information, see CANCEL JOB.
Start a new changefeed where another ended
Find the high-water timestamp for the ended changefeed:
> SELECT * FROM crdb_internal.jobs WHERE job_id = <job_id>;
        job_id       |  job_type  | ... |      high_water_timestamp      | error | coordinator_id
+--------------------+------------+ ... +--------------------------------+-------+----------------+
  383870400694353921 | CHANGEFEED | ... | 1537279405671006870.0000000000 |       |              1
(1 row)
Use the high_water_timestamp to start the new changefeed:
> CREATE CHANGEFEED FOR TABLE name, name2, name3
  INTO 'kafka//host:port'
  WITH cursor = '<high_water_timestamp>';
Note that because the cursor is provided, the initial scan is not performed.