-
Notifications
You must be signed in to change notification settings - Fork 25
Description
Describe the bug
When creating connectors, setting the state to anything other than RUNNING is ignored. Connectors should have the ability to be created with states other than running, i.e., STOPPED, or PAUSED.
To Reproduce
- Create a valid connector with the value for
stateset to eitherSTOPPEDorPAUSED. - On success, the output of the changes will read as:
"spec" : {
..., {
"name" : "state",
"op" : "CREATE",
"after" : "PAUSED"
}, {
...
- curl the connector to get the status and we'll see that it's state is running:
curl localhost:8083/connectors/catalog-changelog-d1/status
{
"name": "catalog-changelog-d1",
"connector": {
"state": "RUNNING",
"worker_id": "100.114.231.248:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "100.114.231.248:8083"
}
],
"type": "sink"
}
Expected behavior
The expected behavior is a connector that's newly created and initialized in the STOPPED or PAUSED state. The state could be any valid initial state other than RUNNING.
Screenshots/Configs
Initial running state on apply

Actual state after apply
Runtime environment
- OS: macOS Sequoia 15.7.1
- Jikkou: 0.35.7
- Docker version: N/A
- Kafka Cluster Version: cp-7.9.2
Additional context
I'm able to achieve this by posting a JSON file to the connect cluster with a key named initial_state, and its value set to the string STOPPED or PAUSED and it works. For example:
{
"name": "catalog-changelog-d1",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"s3.region": "us-east-1",
"topics.dir": "<redacted>",
"flush.size": "100",
"tasks.max": "1",
"timezone": "UTC",
"enhanced.avro.schema.support": "false",
"locale": "en-US",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"aws.access.key.id": "",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"s3.bucket.name": "<redacted>",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"partition.duration.ms": "600000",
"schema.compatibility": "NONE",
"topics": "catalog.changelog",
"connect.meta.data": "false",
"aws.secret.access.key": "",
"value.converter.schema.registry.url": "<redacted>",
"s3.compression.type": "gzip",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"value.converter.schemas.enable": "false",
"name": "catalog-changelog-d1",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"rotate.schedule.interval.ms": "600000",
"path.format": "YYYY/MM/dd/HH"
},
"initial_state": "STOPPED"
}
KIP-980 was accepted to allow for this: https://cwiki.apache.org/confluence/display/KAFKA/KIP-980%3A+Allow+creating+connectors+in+a+stopped+state