The CREATE CHANGEFEED statement creates a new changefeed, which targets an allowlist of tables called "watched rows".  Every change to a watched row is emitted as a record in a configurable format (JSON or Avro) to a configurable sink or directly to the SQL session. 
When a changefeed emits messages to a sink, it works as a job. You can create, pause, resume, alter, or cancel a changefeed job.
To get started with changefeeds, refer to the Create and Configure Changefeeds page for important usage considerations. For details on how changefeeds emit messages, refer to the Changefeed Messages page.
The examples on this page provide the foundational syntax of the CREATE CHANGEFEED statement. For examples on more specific use cases with changefeeds, refer to the following pages:
Cockroach Labs recommends monitoring your changefeeds to track retryable errors and protected timestamp usage. Refer to the Monitor and Debug Changefeeds page for more information.
Required privileges
As of v25.1, viewing and managing a changefeed job by users with the CHANGEFEED privilege is deprecated. This functionality of the CHANGEFEED privilege will be removed in a future release.
We recommend transitioning users that need to view and manage running changefeed jobs to roles that own the jobs or granting them the VIEWJOB or CONTROLJOB privilege. For more details, refer to View and manage changefeed jobs.
Privilege model
For fine-grained access control, we recommend using the system-level privileges CHANGEFEED and CONTROLJOB / VIEWJOB.
The following summarizes the operations users can run depending on whether the assigned privileges are at the job or table level:
| Granted privileges | Usage | 
|---|---|
| CHANGEFEED | Create changefeeds on tables. For details, refer to CHANGEFEEDprivilege.Deprecated: View and manage changefeed jobs on tables. Instead, transition users that need to view and manage running changefeed jobs to roles that own the jobs or granting them the VIEWJOBorCONTROLJOBprivilege. For more details, refer to View and manage changefeed jobs. | 
| CHANGEFEED+USAGEon external connection | Create changefeeds on tables to an external connection URI. For details, refer to CHANGEFEEDprivilege.Deprecated: View and manage changefeed jobs on tables. Instead, transition users that need to view and manage running changefeed jobs to roles that own the jobs or granting them the VIEWJOBorCONTROLJOBprivilege. For more details, refer to View and manage changefeed jobs.Note: If you need to manage access to changefeed sink URIs, set the changefeed.permissions.require_external_connection_sink.enabled=truecluster setting. This will mean that users with these privileges can only create changefeeds on external connections. | 
| Job ownership | View and manage changefeed jobs (pause, resume, and cancel). For details, refer to View and manage changefeed jobs. | 
| CONTROLJOB | Manage changefeed jobs (pause, resume, and cancel). For details, refer to View and manage changefeed jobs. | 
| VIEWJOB | View changefeed jobs. For details, refer to View and manage changefeed jobs. | 
| SELECT | Create a sinkless changefeed that emits messages to a SQL client. | 
| Deprecated CONTROLCHANGEFEEDrole option +SELECT | Create changefeeds on tables. Users with the CONTROLCHANGEFEEDrole option must haveSELECTon each table, even if they are also granted theCHANGEFEEDprivilege.The CONTROLCHANGEFEEDrole option will be removed in a future release. We recommend using the system-level privilegesCHANGEFEEDandCONTROLJOB/VIEWJOBfor fine-grained access control. | 
| admin | Create, view, and manage changefeed jobs. | 
CHANGEFEED privilege
Viewing and managing changefeed jobs with the CHANGEFEED privilege is deprecated as of v25.1. Instead, transition users that need to view and manage running changefeed jobs to roles that own the jobs or granting them the VIEWJOB or CONTROLJOB privilege. For more details, refer to View and manage changefeed jobs.
You can grant a user the CHANGEFEED privilege to allow them to create changefeeds on a specific table:
GRANT CHANGEFEED ON TABLE example_table TO user;
When you grant a user the CHANGEFEED privilege on a set of tables, they can create changefeeds on the target tables even if the user does not have the CONTROLCHANGEFEED role option or the SELECT privilege on the tables.
These users will be able to create changefeeds, but they will not be able to run a SELECT query on that data directly. However, they could still read this data indirectly if they have read access to the sink.
You can add CHANGEFEED to the user or role's default privileges with ALTER DEFAULT PRIVILEGES:
ALTER DEFAULT PRIVILEGES GRANT CHANGEFEED ON TABLES TO user;
To restrict a user's access to changefeed data and sink credentials, enable the changefeed.permissions.require_external_connection_sink.enabled cluster setting. When you enable this setting, users with the CHANGEFEED privilege on a set of tables can only create changefeeds into external connections.
View and manage changefeed jobs
Users can view and manage changefeed jobs when one of the following are met:
- Job ownership: They own the job, or are a member of a role that owns a job.
- Global privileges: They are assigned CONTROLJOBorVIEWJOB.
To give a set of users access to a specific job, or set of jobs, assign them to a role that owns the job(s).
You can transfer ownership of a job to a role or user using the ALTER JOB statement:
ALTER JOB job_ID OWNER TO role_name;
Synopsis
Parameters
| Parameter | Description | 
|---|---|
| table_name | The name of the table (or tables in a comma separated list) to create a changefeed for. Note: Before creating a changefeed, consider the number of changefeeds versus the number of tables to include in a single changefeed. Each scenario can have an impact on total memory usage or changefeed performance. Refer to Create and Configure Changefeeds for more detail. | 
| sink | The location of the configurable sink. The scheme of the URI indicates the type. For more information, refer to Sink URI. Note: If you create a changefeed without a sink, your changefeed will run like a basic changefeed sending messages to the SQL client. For more detail, refer to the Create and Configure Changefeeds page. | 
| option/value | For a list of available options and their values, refer to Options. | 
CDC query parameters
Change data capture queries allow you to define the change data emitted to your sink when you create a changefeed. See the Change Data Capture Queries page for detail on the functionality, syntax, and use cases for changefeeds created with queries.
| Parameter | Description | 
|---|---|
| sink | The location of the configurable sink. The scheme of the URI indicates the type. For more information, see Sink URI. | 
| option/value | For a list of available options and their values, see Options. | 
| target_list | The columns to emit data from. | 
| changefeed_target_expr | The target table for the changefeed. | 
| opt_where_clause | An optional WHEREclause to apply filters to the table. | 
Sink URI
To form the URI for each sink:
'{scheme}://{host}:{port}?{query_parameters}'
| URI Component | Description | 
|---|---|
| scheme | The type of sink, e.g., kafka,gcpubsub. | 
| host | The sink's hostname or IP address. | 
| port | The sink's port. | 
| query_parameters | The sink's query parameters. | 
For more comprehensive detail of using and configuring each sink, refer to:
- Amazon MSK
- Apache Pulsar (in Preview)
- Azure Event Hubs
- Cloud Storage / HTTP
- Confluent Cloud
- Google Cloud Pub/Sub
- Kafka
- Webhook
You can create an external connection to represent a changefeed sink URI. This allows you to specify the external connection's name in statements rather than the provider-specific URI. For detail on using external connections, see the CREATE EXTERNAL CONNECTION page.
Query parameters
Parameters should always be URI-encoded before they are included the changefeed's URI, as they often contain special characters. Use Javascript's encodeURIComponent function or Go language's url.QueryEscape function to URI-encode the parameters. Other languages provide similar functions to URI-encode special characters.
Query parameters include:
| Parameter | Sink Type | Type | Description | 
|---|---|---|---|
| ASSUME_ROLE | Amazon S3, Google Cloud Storage, Google Cloud Pub/Sub | STRING | Pass the ARN of the role to assume. Use in combination with AUTH=implicitorspecified.external_id: Use as a value toASSUME_ROLEto specify the external ID for third-party access to your S3 bucket. | 
| AUTH | Amazon S3, Google Cloud Storage, Google Cloud Pub/Sub, Azure Blob Storage | STRING | The authentication parameter can define either specified(default) orimplicitauthentication. To usespecifiedauthentication, pass your Service Account credentials with the URI. To useimplicitauthentication, configure these credentials via an environment variable. Refer to the Cloud Storage Authentication page page for examples of each of these. | 
| api_key | Confluent Cloud | STRING | The API key created for the cluster in Confluent Cloud. | 
| api_secret | Confluent Cloud | STRING | The API key's secret generated in Confluent Cloud. Note: This must be URL-encoded before passing into the connection string. | 
| ca_cert | Kafka, webhook, Confluent schema registry | STRING | The base64-encoded ca_certfile. Specifyca_certfor a Kafka sink, webhook sink, and/or a Confluent schema registry.For usage with a Kafka sink, see Kafka Sink URI. It's necessary to state httpsin the schema registry's address when passingca_cert:confluent_schema_registry='https://schema_registry:8081?ca_cert=LS0tLS1CRUdJTiBDRVJUSUZ'See confluent_schema_registryfor more detail on using this option.Note: To encode your ca.cert, runbase64 -w 0 ca.cert. | 
| client_cert | Kafka, webhook, Confluent schema registry | STRING | The base64-encoded Privacy Enhanced Mail (PEM) certificate. This is used with client_key. | 
| client_key | Kafka, webhook, Confluent schema registry | STRING | The base64-encoded private key for the PEM certificate. This is used with client_cert.Note: Client keys are often encrypted. You will receive an error if you pass an encrypted client key in your changefeed statement. To decrypt the client key, run: openssl rsa -in key.pem -out key.decrypt.pem -passin pass:{PASSWORD}. Once decrypted, be sure to update your changefeed statement to use the newkey.decrypt.pemfile instead. | 
| file_size | cloud | STRING | The file will be flushed (i.e., written to the sink) when it exceeds the specified file size. This can be used with the WITH resolvedoption, which flushes on a specified cadence. When you change or increasefile_size, ensure that you adjust thechangefeed.memory.per_changefeed_limitcluster setting, which has a default of512MiB. Buffering messages can quickly reach this limit if you have increased the file size. Refer to Advanced Changefeed Configuration for more detail.Default: 16MB | 
| insecure_tls_skip_verify | Kafka, webhook | BOOL | If true, disable client-side validation of responses. Note that a CA certificate is still required; this parameter means that the client will not verify the certificate. Warning: Use this query parameter with caution, as it creates MITM vulnerabilities unless combined with another method of authentication.Default: false | 
| partition_format | cloud | STRING | Specify how changefeed file paths are partitioned in cloud storage sinks. Use partition_formatwith the following values:
 CREATE CHANGEFEED FOR TABLE users INTO 'gs://...?AUTH...&partition_format=hourly'Default: daily | 
| S3_STORAGE_CLASS | Amazon S3 cloud storage sink | STRING | Specify the Amazon S3 storage class for files created by the changefeed. See Create a changefeed with an S3 storage class for the available classes and an example. Default: STANDARD | 
| sasl_aws_iam_role_arn | Amazon MSK | STRING | The ARN for the IAM role that has the permissions to create a topic and send data to the topic. For more details on setting up an Amazon MSK cluster with an IAM role, refer to the AWS documentation. | 
| sasl_aws_iam_session_name | Amazon MSK | STRING | The user-specified string that identifies the session in AWS. | 
| sasl_aws_region | Amazon MSK | STRING | The region of the Amazon MSK cluster. | 
| sasl_client_id | Kafka | STRING | Client ID for OAuth authentication from a third-party provider. This parameter is only applicable with sasl_mechanism=OAUTHBEARER. | 
| sasl_client_secret | Kafka | STRING | Client secret for OAuth authentication from a third-party provider. This parameter is only applicable with sasl_mechanism=OAUTHBEARER. Note: You must base64 encode this value when passing it in as part of a sink URI. | 
| sasl_enabled | Amazon MSK, Azure Event Hubs, Kafka, Confluent Cloud | BOOL | If true, set the authentication protocol with thesasl_mechanismparameter. You must havetls_enabledset totrueto use SASL.For Confluent Cloud and Azure Event Hubs sinks, this is set to trueby default.Default: false | 
| sasl_grant_type | Kafka | STRING | Override the default OAuth client credentials grant type for other implementations. This parameter is only applicable with sasl_mechanism=OAUTHBEARER. | 
| sasl_handshake | Azure Event Hubs, Kafka, Confluent Cloud | BOOL | For Confluent Cloud and Azure Event Hubs sinks, this is set to trueby default. | 
| sasl_mechanism | Amazon MSK, Azure Event Hubs, Kafka, Confluent Cloud | STRING | Can be set to OAUTHBEARER,SCRAM-SHA-256,SCRAM-SHA-512, orPLAIN. Asasl_userandsasl_passwordare required forPLAINandSCRAMauthentication.For Amazon MSK clusters, set to AWS_MSK_IAM.sasl_aws_iam_role_arn,sasl_aws_iam_session_name, andsasl_aws_regionare also required in the sink uri.Refer to the Connect to a Changefeed Kafka sink with OAuth Using Okta tutorial for detail setting up OAuth using Okta. For Confluent Cloud and Azure Event Hubs sinks, sasl_mechanism=PLAINis required but set automatically by CockroachDB.Default: PLAIN | 
| sasl_scopes | Kafka | STRING | A list of scopes that the OAuth token should have access for. This parameter is only applicable with sasl_mechanism=OAUTHBEARER. | 
| sasl_token_url | Kafka | STRING | Client token URL for OAuth authentication from a third-party provider. Note: You must URL encode this value before passing in a URI. This parameter is only applicable with sasl_mechanism=OAUTHBEARER. | 
| sasl_user | Amazon MSK, Kafka | STRING | Your SASL username. | 
| sasl_password | Amazon MSK, Kafka | STRING | Your SASL password. Note: Passwords should be URL encoded since the value can contain characters that would cause authentication to fail. | 
| shared_access_key | Azure Event Hubs | STRING | The URL-encoded key for your Event Hub shared access policy. | 
| shared_access_key_name | Azure Event Hubs | STRING | The name of your Event Hub shared access policy. | 
| tls_enabled | Amazon MSK, Kafka, Confluent Cloud | BOOL | If true, enable Transport Layer Security (TLS) on the connection to Kafka. This can be used with aca_cert(see below).For Confluent Cloud and Azure Event Hubs sinks, this is set to trueby default.Default: false | 
| topic_name | Azure Event Hubs, Kafka, Confluent Cloud, GC Pub/Sub | STRING | Allows arbitrary topic naming for Kafka and GC Pub/Sub topics. See the Kafka topic naming limitations or GC Pub/Sub topic naming for detail on supported characters etc. For example, CREATE CHANGEFEED FOR foo,bar INTO 'kafka://sink?topic_name=all'will emit all records to a topic namedall. Note that schemas will still be registered separately. When using Kafka, this parameter can be combined with thetopic_prefixparameter (this is not supported for GC Pub/Sub).Default: table name. | 
| topic_prefix | Azure Event Hubs, Kafka, Confluent Cloud | STRING | Adds a prefix to all topic names. For example, CREATE CHANGEFEED FOR TABLE foo INTO 'kafka://...?topic_prefix=bar_'would emit rows under the topicbar_fooinstead offoo. | 
Options
| Option | Value | Description | 
|---|---|---|
| avro_schema_prefix | Schema prefix name | Provide a namespace for the schema of a table in addition to the default, the table name. This allows multiple databases or clusters to share the same schema registry when the same table name is present in multiple databases. Example: CREATE CHANGEFEED FOR foo WITH format=avro, confluent_schema_registry='registry_url', avro_schema_prefix='super'will register subjects assuperfoo-keyandsuperfoo-valuewith the namespacesuper. | 
| compression | gzip,zstd | Compress changefeed data files written to: 
 | 
| confluent_schema_registry | Schema Registry address | The Schema Registry address is required to use avro.Use the timeout={duration}query parameter (duration string) in your Confluent Schema Registry URI to change the default timeout for contacting the schema registry. By default, the timeout is 30 seconds.To connect to Confluent Cloud, use the following URL structure: 'https://{API_KEY_ID}:{API_SECRET_URL_ENCODED}@{CONFLUENT_REGISTRY_URL}:443'. See the Stream a Changefeed to a Confluent Cloud Kafka Cluster tutorial for further detail.Use the changefeed.schema_registry.retry_countmetric to measure the number of request retries performed when sending requests to the schema registry. For more detail on monitoring changefeeds, refer to Monitor and Debug Changefeeds. | 
| cursor | Timestamp | Emit any changes after the given timestamp. cursordoes not output the current state of the table first. Whencursoris not specified, the changefeed starts by doing an initial scan of all the watched rows and emits the current value, then moves to emitting any changes that happen after the scan.The changefeed will encounter an error if you specify a timestamp that is before the configured garbage collection window for the target table. (Refer to gc.ttlseconds.) With default garbage collection settings, this means you cannot create a changefeed that starts more than the-default-MVCC-garbage-collection-interval in the past.You can use cursorto start-a-new-changefeed-where-a-previous-changefeed-ended.Example: cursor='1536242855577149065.0000000000' | 
| diff | N/A | Publish a beforefield with each message, which includes the value of the row before the update was applied. Changefeeds must use thediffoption with the defaultwrappedenvelope or theenrichedenvelope to emit thebeforefield. | 
| encode_json_value_null_as_object | N/A | Emit JSON NULLvalues as{"__crdb_json_null__": true}to distinguish these values from SQLNULLvalues. Refer to the Changefeed Messages page for an example.Note: When this option is enabled, if the changefeed encounters the literal value {"__crdb_json_null__": true}in JSON, it will have the same representation as a JSONNULLvalue and a warning will be printed to theDEVlogging channel. | 
| end_time | Timestamp | Indicate the timestamp up to which the changefeed will emit all events and then complete with a successfulstatus. Provide a future timestamp toend_timein number of nanoseconds since the Unix epoch. For example,end_time="1655402400000000000". You cannot useend_timeandinitial_scan = 'only'simultaneously. | 
| envelope | wrapped/enriched/bare/key_only/row | wrappedthe default envelope structure for changefeed messages containing an array of the primary key, a top-level field for the type of message, and the current state of the row (ornullfor deleted rows).Refer to the Changefeed Message Envelopes page for more detail on each envelope. Default: envelope=wrapped. Default for CDC-queries:envelope=bare. | 
| execution_locality | Key-value pairs | Restricts the execution of a changefeed to nodes that match the defined locality filter requirements, e.g., WITH execution_locality = 'region=us-west-1a,cloud=aws'.See Run a changefeed job by locality for usage and reference detail. | 
| format | json/avro/csv/parquet | Format of the emitted message. avro: For mappings of CockroachDB types to Avro types, refer-to-the-table and detail on Avro-limitations. Note:confluent_schema_registryis required withformat=avro.csv: You cannot combineformat=csvwith thedifforresolvedoptions. Changefeeds use the same CSV format as theEXPORTstatement. Refer to Export-data-with-changefeeds for details using these options to create a changefeed as an alternative toEXPORT. Note:initial_scan = 'only'is required withformat=csv.parquet: Cloud storage is the only supported sink. Thetopic_in_valueoption is not compatible withparquetformat.Default: format=json. | 
| full_table_name | N/A | Use fully qualified table name in topics, subjects, schemas, and record output instead of the default table name. This can prevent unintended behavior when the same table name is present in multiple databases. Note: This option cannot modify existing table names used as topics, subjects, etc., as part of an ALTER CHANGEFEEDstatement. To modify a topic, subject, etc., to use a fully qualified table name, create a new changefeed with this option.Example: CREATE CHANGEFEED FOR foo... WITH full_table_namewill create the topic namedefaultdb.public.fooinstead offoo. | 
| gc_protect_expires_after | Duration string | Automatically expires protected timestamp records that are older than the defined duration. In the case where a changefeed job remains paused, gc_protect_expires_afterwill trigger the underlying protected timestamp record to expire and cancel the changefeed job to prevent accumulation of protected data.Refer to Protect-Changefeed-Data-from-Garbage-Collection for more detail on protecting changefeed data. | 
| New in v25.2: headers_json_column_name | STRING | Specify a JSONB column that the changefeed emits as Kafka headers, separate from the message payload, for each row’s change event. headers_json_column_nameis supported for Kafka sinks. For more details, refer to Specify a column as a Kafka header. | 
| ignore_disable_changefeed_replication | BOOL | When set to true, the changefeed will emit events even if CDC filtering for TTL jobs is configured using thedisable_changefeed_replicationsession variable,sql.ttl.changefeed_replication.disabledcluster setting, or thettl_disable_changefeed_replicationtable storage parameter.Refer to Filter changefeeds for tables using TTL for usage details. | 
| initial_scan | yes/no/only | Control whether or not an initial scan will occur at the start time of a changefeed. Only one initial_scanoption (yes,no, oronly) can be used. If none of these are set, an initial scan will occur if there is nocursor, and will not occur if there is one. This preserves the behavior from previous releases. Withinitial_scan = 'only'set, the changefeed job will end with a successful status (succeeded) after the initial scan completes. You cannot specifyyes,no,onlysimultaneously.If used in conjunction with cursor, an initial scan will be performed at the cursor timestamp. If nocursoris specified, the initial scan is performed atnow().Although the initial_scan/no_initial_scansyntax from previous versions is still supported, you cannot combine the previous and current syntax.Default: initial_scan = 'yes' | 
| kafka_sink_config | STRING | Set fields to configure the required level of message acknowledgement from the Kafka server, the version of the server, and batching parameters for Kafka sinks. Set the message file compression type. See Kafka sink configuration for more detail on configuring all the available fields for this option. Example: CREATE CHANGEFEED FOR table INTO 'kafka://localhost:9092' WITH kafka_sink_config='{"Flush": {"MaxMessages": 1, "Frequency": "1s"}, "RequiredAcks": "ONE"}' | 
| key_column | 'column' | Override the key used in message metadata. This changes the key hashed to determine downstream partitions. In sinks that support partitioning by message, CockroachDB uses the 32-bit FNV-1a hashing algorithm to determine which partition to send to. Note: key_columndoes not preserve ordering of messages from CockroachDB to the downstream sink, therefore you must also include theunorderedoption in your changefeed creation statement. It does not affect per-key ordering guarantees or the output ofkey_in_value.See the Define a key to determine the changefeed sink partition example. | 
| key_in_value | N/A | Add a primary key array to the emitted message. This makes the primary key of a deleted row recoverable in sinks where each message has a value but not a key (most have a key and value in each message). key_in_valueis automatically used for cloud storage sinks, webhook sinks, and GC Pub/Sub sinks. | 
| lagging_ranges_threshold | Duration string | Set a duration from the present that determines the length of time a range is considered to be lagging behind, which will then track in the lagging_rangesmetric. Note that ranges undergoing an initial scan for longer than the threshold duration are considered to be lagging. Starting a changefeed with an initial scan on a large table will likely increment the metric for each range in the table. As ranges complete the initial scan, the number of ranges lagging behind will decrease.Default: 3m | 
| lagging_ranges_polling_interval | Duration string | Set the interval rate for when lagging ranges are checked and the lagging_rangesmetric is updated. Polling adds latency to thelagging_rangesmetric being updated. For example, if a range falls behind by 3 minutes, the metric may not update until an additional minute afterward.Default: 1m | 
| metrics_label | STRING | Define a metrics label to which the metrics for one or multiple changefeeds increment. All changefeeds also have their metrics aggregated. The maximum length of a label is 128 bytes. There is a limit of 1024 unique labels. WITH metrics_label=label_nameFor more detail on usage and considerations, see Using changefeed metrics labels. | 
| min_checkpoint_frequency | Duration string | Controls how often a node's changefeed aggregator will flush their progress to the coordinating changefeed node. A node's changefeed aggregator will wait at least the specified duration between sending progress updates for the ranges it is watching to the coordinator. This can help you control the flush frequency of higher latency sinks to achieve better throughput. However, more frequent checkpointing can increase CPU usage. If this is set to 0s, a node will flush messages as long as the high-water mark has increased for the ranges that particular node is processing. If a changefeed is resumed, thenmin_checkpoint_frequencyis the amount of time that changefeed will need to catch up. That is, it could emit duplicate messages during this time.Note: resolvedmessages will not be emitted more frequently than the configuredmin_checkpoint_frequency(but may be emitted less frequently). If you requireresolvedmessages more frequently than30s, you must configuremin_checkpoint_frequencyto at least the desiredresolvedmessage frequency. For more details, refer to Resolved message frequency.Default: 30s | 
| mvcc_timestamp | N/A | Include the MVCC timestamp for each emitted row in a changefeed. With the mvcc_timestampoption, each emitted row will always contain its MVCC timestamp, even during the changefeed's initial backfill. | 
| on_error | pause/fail | Use on_error=pauseto pause the changefeed when encountering non-retryable errors.on_error=pausewill pause the changefeed instead of sending it into a terminal failure state. Note: Retryable errors will continue to be retried with this option specified.Use with protect_data_from_gc_on_pauseto protect changes from garbage collection.If a changefeed with on_error=pauseis running when a watched table is truncated, the changefeed will pause but will not be able to resume reads from that table. UsingALTER CHANGEFEEDto drop the table from the changefeed and then resuming the job will work, but you cannot add the same table to the changefeed again. Instead, you will need to create a new changefeed for that table.Default: on_error=fail | 
| protect_data_from_gc_on_pause | N/A | This option is deprecated as of v23.2 and will be removed in a future release. When a changefeed is paused, ensure that the data needed to resume the changefeed is not garbage collected. If protect_data_from_gc_on_pauseis unset, pausing the changefeed will release the existing protected timestamp records. It is also important to note that pausing and addingprotect_data_from_gc_on_pauseto a changefeed will not protect data if the garbage collection window has already passed.Use with on_error=pauseto protect changes from garbage collection when encountering non-retryable errors.Refer to Protect Changefeed Data from Garbage Collection for more detail on protecting changefeed data. Note: If you use this option, changefeeds that are left paused for long periods of time can prevent garbage collection. Use with the gc_protect_expires_afteroption to set a limit for protected data and for how long a changefeed will remain paused. | 
| pubsub_sink_config | STRING | Set fields to configure sink batching and retries. The schema is as follows: { "Flush": { "Messages": ..., "Bytes": ..., "Frequency": ..., }, "Retry": {"Max": ..., "Backoff": ..., } }.Note that if either MessagesorBytesare nonzero, then a non-zero value forFrequencymust be provided.Refer to Pub/Sub sink configuration for more details on using this option. | 
| resolved | Duration string | Emit resolved timestamps in a format dependent on the connected sink. Resolved timestamps do not emit until the changefeed job's progress has been checkpointed. Set a minimum amount of time that the changefeed's high-water mark (overall resolved timestamp) must advance by before another resolved timestamp is emitted. Example: resolved='10s'. This option will only emit a resolved timestamp if the timestamp has advanced (and by at least the optional duration, if set). If a duration is unspecified, all resolved timestamps are emitted as the high-water mark advances.Note: If you set resolvedlower than30s, then you must also setmin_checkpoint_frequencyto at minimum the same value asresolved, becauseresolvedmessages may be emitted less frequently thanmin_checkpoint_frequency, but cannot be emitted more frequently.Refer to Resolved messages for more detail. | 
| schema_change_events | default/column_changes | The type of schema change event that triggers the behavior specified by the schema_change_policyoption:
 schema_change_events=default | 
| schema_change_policy | backfill/nobackfill/stop | The behavior to take when an event specified by the schema_change_eventsoption occurs:
 schema_change_policy=backfill | 
| split_column_families | N/A | Use this option to create a changefeed on a table with multiple column families. The changefeed will emit messages for each of the table's column families. See Changefeeds on tables with column families for more usage detail. | 
| topic_in_value | BOOL | Set to include the topic in each emitted row update. This option is automatically set for webhook sinks. Note: topic_in_valueis not compatible with changefeeds running inparquetformat. | 
| unordered | N/A | Run a changefeed to Google Cloud Pub/Sub without specifying a region. You must include the unorderedoption withkey_columnin your changefeed creation statement.You cannot use unorderedwithresolved, because resolved timestamps may not be correct in unordered mode. | 
| updated | N/A | Include updated timestamps with each row. If a cursoris provided, the "updated" timestamps will match the MVCC timestamps of the emitted rows, and there is no initial scan. If acursoris not provided, the changefeed will perform an initial scan (as of the time the changefeed was created), and the "updated" timestamp for each change record emitted in the initial scan will be the timestamp of the initial scan. Similarly, when a backfill is performed for a schema change, the "updated" timestamp is set to the first timestamp for when the new schema is valid. | 
| virtual_columns | STRING | Changefeeds omit virtual computed columns from emitted messages by default. To maintain the behavior of previous CockroachDB versions where the changefeed would emit NULLvalues for virtual computed columns, setvirtual_columns = "null"when you start a changefeed.You may also define virtual_columns = "omitted", though this is already the default behavior for v22.1+. If you do not set"omitted"on a table with virtual computed columns when you create a changefeed, you will receive a warning that changefeeds will filter out virtual computed values.Default: "omitted" | 
| webhook_auth_header | STRING | Pass a value (password, token etc.) to the HTTP Authorization header with a webhook request for a "Basic" HTTP authentication scheme. Example: With a username of "user" and password of "pwd", add a colon between "user:pwd" and then base64 encode, which results in "dXNlcjpwd2Q=". WITH webhook_auth_header='Basic dXNlcjpwd2Q='. | 
| webhook_client_timeout | INTERVAL | If a response is not recorded from the sink within this timeframe, it will error and retry to connect. Note this must be a positive value. Default: "3s" | 
| webhook_sink_config | STRING | Set fields to configure sink batching and retries. The schema is as follows: { "Flush": { "Messages": ..., "Bytes": ..., "Frequency": ..., }, "Retry": {"Max": ..., "Backoff": ..., } }.Note that if either MessagesorBytesare nonzero, then a non-zero value forFrequencymust be provided.See Webhook sink configuration for more details on using this option. | 
Using the format=avro, envelope=key_only, and updated options together is rejected. envelope=key_only prevents any rows with updated fields from being emitted, which makes the updated option meaningless.
Files
The files emitted to a sink use the following naming conventions:
The timestamp format is YYYYMMDDHHMMSSNNNNNNNNNLLLLLLLLLL.
General file format
/[date]/[timestamp]-[uniquer]-[topic]-[schema-id]
For example:
/2020-04-02/202004022058072107140000000000000-56087568dba1e6b8-1-72-00000000-test_table-1.ndjson
When emitting changefeed messages to a cloud storage sink, you can specify a partition format for your files using the partition_format query parameter. This will result in the following file path formats:
- daily: This is the default option and will follow the same pattern as the previous general file format.
- hourly: This will partition into an hourly directory as the changefeed emits messages, like the following:- /2020-04-02/20/202004022058072107140000000000000-56087568dba1e6b8-1-72-00000000-test_table-1.ndjson
- flat: This will result in no file partitioning. The cloud storage path you specify when creating a changefeed will store all of the message files with no additional directories created.
Resolved file format
/[date]/[timestamp].RESOLVED
For example:
/2020-04-04/202004042351304139680000000000000.RESOLVED
Examples
Before running any of the examples in this section it is necessary to enable the kv.rangefeed.enabled cluster setting. If you are working on a CockroachDB Standard or Basic cluster, this cluster setting is enabled by default.
The following examples show the syntax for managing changefeeds and starting changefeeds with different use cases and features. The Options table on this page provides a list of all the available options. For information on sink-specific query parameters and configurations, refer to the Changefeed Sinks page.
You can create an external connection to represent a changefeed sink URI. This allows you to specify the external connection's name in statements rather than the provider-specific URI. For detail on using external connections, see the CREATE EXTERNAL CONNECTION page.
We recommend limiting the number of changefeeds per cluster to 80.
Create a changefeed connected to a sink
CREATE CHANGEFEED FOR TABLE table_name, table_name2, table_name3
  INTO 'scheme://host:port'
  WITH updated, resolved;
You can connect a changefeed to the following sinks:
- Amazon MSK
- Apache Pulsar (in Preview)
- Azure Event Hubs
- Cloud Storage / HTTP
- Confluent Cloud
- Google Cloud Pub/Sub
- Kafka
- Webhook
Create a changefeed that filters and transforms change data
CDC queries can filter and transform change data before emitting it to a sink or a SQL client.
You can adapt a changefeed with CDC queries by including SELECT and WHERE clauses in your CREATE statement:
CREATE CHANGEFEED INTO 'scheme://host:port'
  WITH updated, resolved
  AS SELECT owner_id, status
  FROM vehicles
  WHERE status = 'lost';
CDC queries can only run on a single table per changefeed.
Create a sinkless changefeed
You can create a changefeed that will send messages to the SQL client rather than a sink:
CREATE CHANGEFEED FOR TABLE table_name, table_name2, table_name3
  WITH updated, resolved;
To create a sinkless changefeed using CDC queries:
CREATE CHANGEFEED WITH updated, resolved
  AS SELECT owner_id, status
  FROM vehicles
  WHERE status = 'lost';
Use an external connection to specify a changefeed sink
External connections provide a way to define a name for a sink, which you can use instead of the provider-specific URI.
To restrict a user's access to changefeed data and sink credentials, enable the changefeed.permissions.require_external_connection_sink.enabled cluster setting. When you enable this setting, users with the CHANGEFEED privilege on a set of tables can only create changefeeds into external connections.
External connections support all changefeed sinks.
CREATE EXTERNAL CONNECTION kafka_sink
  AS 'kafka://broker.address.com:9092?topic_prefix=bar_&tls_enabled=true&ca_cert={certificate}&sasl_enabled=true&sasl_user={sasl user}&sasl_password={url-encoded password}&sasl_mechanism=SCRAM-SHA-256';
In the changefeed statement, you specify the external connection name:
CREATE CHANGEFEED FOR TABLE table_name INTO 'external://kafka_sink'
  WITH resolved;
Filter changefeeds for tables using row-level TTL
Use the ttl_disable_changefeed_replication table storage parameter to prevent changefeeds from sending DELETE messages issued by row-level TTL jobs for a table. Include the storage parameter when you create or alter the table. For example:
CREATE TABLE tbl (
  id UUID PRIMARY KEY default gen_random_uuid(),
  value TEXT
) WITH (ttl_expire_after = '3 weeks', ttl_job_cron = '@daily', ttl_disable_changefeed_replication = 'true');
ALTER TABLE events SET (ttl_expire_after = '1 year', ttl_disable_changefeed_replication = 'true');
You can also widen the scope to the cluster by setting the sql.ttl.changefeed_replication.disabled cluster setting to true. This will prevent changefeeds from emitting deletes issued by all TTL jobs on a cluster.
If you want to have a changefeed ignore the storage parameter or cluster setting that disables changefeed replication, you can set the changefeed option ignore_disable_changefeed_replication to true:
CREATE CHANGEFEED FOR TABLE table_name INTO 'external://changefeed-sink'
  WITH resolved, ignore_disable_changefeed_replication = true;
This is useful when you have multiple use cases for different changefeeds on the same table. For example, you have a table with a changefeed streaming changes to another database for analytics workflows in which you do not want to reflect row-level TTL deletes. Secondly, you have a changefeed on the same table for audit-logging purposes for which you need to persist every change through the changefeed.
For guidance on how to filter changefeed messages to emit row-level TTL deletes only, refer to Change Data Capture Queries.
Disallow schema changes on tables to improve changefeed performance
Use the schema_locked storage parameter to disallow schema changes on a watched table, which allows the changefeed to take a fast path that avoids checking if there are schema changes that could require synchronization between changefeed aggregators. This helps to decrease the latency between a write committing to a table and it emitting to the changefeed's sink. Enabling schema_locked 
Enable schema_locked on the watched table with the ALTER TABLE statement:
ALTER TABLE watched_table SET (schema_locked = true);
While schema_locked is enabled on a table, attempted schema changes on the table will be rejected and an error returned. If you need to run a schema change on the locked table, unlock the table with schema_locked = false, complete the schema change, and then lock the table again with schema_locked = true. The changefeed will run as normal while schema_locked = false, but it will not benefit from the performance optimization.
ALTER TABLE watched_table SET (schema_locked = false);
Manage a changefeed
For changefeed jobs, use SHOW CHANGEFEED JOBS to check the status:
SHOW CHANGEFEED JOBS;
Use the following SQL statements to pause, resume, or cancel a changefeed.
Pause a changefeed
PAUSE JOB job_id;
For more information, see PAUSE JOB.
Resume a paused changefeed
RESUME JOB job_id;
For more information, see RESUME JOB.
Cancel a changefeed
CANCEL JOB job_id;
For more information, see CANCEL JOB.
Modify a changefeed
To modify a changefeed, pause the job and then use:
ALTER CHANGEFEED job_id {ADD table DROP table SET option UNSET option};
You can add new table targets, remove them, set new changefeed options, and unset them.
For more information, see ALTER CHANGEFEED.
Configuring all changefeeds
It is useful to be able to pause all running changefeeds during troubleshooting, testing, or when a decrease in CPU load is needed.
To pause all running changefeeds:
PAUSE JOBS (WITH x AS (SHOW CHANGEFEED JOBS) SELECT job_id FROM x WHERE status = ('running'));
This will change the status for each of the running changefeeds to paused, which can be verified with SHOW CHANGEFEED JOBS.
To resume all running changefeeds:
RESUME JOBS (WITH x AS (SHOW CHANGEFEED JOBS) SELECT job_id FROM x WHERE status = ('paused'));
This will resume the changefeeds and update the status for each of the changefeeds to running.
Start a new changefeed where another ended
In some situations, you may want to start a changefeed where a previously running changefeed ended. For example, a changefeed could encounter an error it cannot recover from, such as when a TRUNCATE is performed, and you need to restart the changefeed.
- Use - SHOW CHANGEFEED JOBto find the high-water timestamp for the ended changefeed:- SHOW CHANGEFEED JOB {job_id};- job_id | ... | high_water_timestamp | ... +--------------------+ ... +--------------------------------+ ... 383870400694353921 | ... | 1537279405671006870.0000000000 | ... (1 row)Note:- If a changefeed has failed, you must restart the changefeed from a timestamp after the event that caused the failure. 
- Use the - high_water_timestampto start the new changefeed:- CREATE CHANGEFEED FOR TABLE table_name, table_name2, table_name3 INTO 'scheme//host:port' WITH cursor = '<high_water_timestamp>';
When you use the cursor option to start a changefeed, it will not perform an initial scan.
Create a changefeed with an S3 storage class
To associate the changefeed message files with a specific storage class in your Amazon S3 bucket, use the S3_STORAGE_CLASS parameter with the class. For example, the following S3 connection URI specifies the INTELLIGENT_TIERING storage class:
CREATE CHANGEFEED FOR TABLE table_name
  INTO 's3://{BUCKET NAME}?AWS_ACCESS_KEY_ID={KEY ID}&AWS_SECRET_ACCESS_KEY={SECRET ACCESS KEY}&S3_STORAGE_CLASS=INTELLIGENT_TIERING'
  WITH resolved;
Use the parameter to set one of these storage classes listed in Amazon's documentation. For more general usage information, see Amazon's Using Amazon S3 storage classes documentation.
Define a key to determine the changefeed sink partition
With the key_column option, you can define the key used in message metadata that determines the partition for the changefeed message at your downstream sink. This option overrides the default primary key:
CREATE CHANGEFEED FOR TABLE table_name
  INTO 'external://kafka-sink'
  WITH key_column='partition_column', unordered;
key_column does not preserve ordering of messages from CockroachDB to the downstream sink, therefore you must include the unordered option. It does not affect per-key ordering guarantees or the output of key_in_value.