Reset Kafka Connect Source Connector Offsets
Starting a Kafka Connect sink connector at the end of a topic
When you create a sink connector in Kafka Connect, by default it will start reading from the beginning of the topic and stream all of the existing—and new—data to the target. The setting that controls this behaviour is auto.offset.reset
, and you can see its value in the worker log when the connector runs:
[2019-08-05 23:31:35,405] INFO ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
…
Resetting a Consumer Group in Kafka
Migrating Alfred Clipboard to New Laptop
So how DO you make those cool diagrams? July 2019 update
Taking the Vienna-Munich sleeper train
Manually delete a connector from Kafka Connect
Kafka Connect has as REST API through which all config should be done, including removing connectors that have been created. Sometimes though, you might have reason to want to manually do this—and since Kafka Connect running in distributed mode uses Kafka as its persistent data store, you can achieve this by manually writing to the topic yourself.
Automatically restarting failed Kafka Connect tasks
Here’s a hacky way to automatically restart Kafka Connect connectors if they fail. Restarting automatically only makes sense if it’s a transient failure; if there’s a problem with your pipeline (e.g. bad records or a mis-configured server) then you don’t gain anything from this. You might want to check out Kafka Connect’s error handling and dead letter queues too.
Putting Kafka Connect passwords in a separate file / externalising secrets
Kafka Connect configuration is easy - you just write some JSON! But what if you’ve got credentials that you need to pass? Embedding those in a config file is not always such a smart idea. Fortunately with KIP-297 which was released in Apache Kafka 2.0 there is support for external secrets. It’s extendable to use your own ConfigProvider
, and ships with its own for just putting credentials in a file - which I’ll show here. You can read more here.
Deleting a Connector in Kafka Connect without the REST API
Kafka Connect exposes a REST interface through which all config and monitoring operations can be done. You can create connectors, delete them, restart them, check their status, and so on. But, I found a situation recently in which I needed to delete a connector and couldn’t do so with the REST API. Here’s another way to do it, by amending the configuration Kafka topic that Kafka Connect in distributed mode uses to persist configuration information for connectors. Note that this is not a recommended way of working with Kafka Connect—the REST API is there for a good reason :)
A poor man’s KSQL EXPLODE/UNNEST technique
There is an open issue for support of EXPLODE
/UNNEST
functionality in KSQL, and if you need it then do up-vote the issue. Here I detail a hacky, but effective, workaround for exploding arrays into multiple messages—so long as you know the upper-bound on your array.
When a Kafka Connect converter is not a converter
Kafka Connect is a API within Apache Kafka and its modular nature makes it powerful and flexible. Converters are part of the API but not always fully understood. I’ve written previously about Kafka Connect converters, and this post is just a hands-on example to show even further what they are—and are not—about.
Note
|
To understand more about Kafka Connect in general, check out my talk from Kafka Summit London From Zero to Hero with Kafka Connect. |
Reading Kafka Connect Offsets via the REST Proxy
When you run Kafka Connect in distributed mode it uses a Kafka topic to store the offset information for each connector. Because it’s just a Kafka topic, you can read that information using any consumer.
Pivoting Aggregates in Ksql
Prompted by a question on StackOverflow, the requirement is to take a series of events related to a common key and for each key output a series of aggregates derived from a changing value in the events. I’ll use the data from the question, based on ticket statuses. Each ticket can go through various stages, and the requirement was to show, per customer, how many tickets are currently at each stage.
Connecting KSQL to a Secured Schema Registry
Confluent Cloud now includes a secured Schema Registry, which you can use from external applications, including KSQL.
To configure KSQL for it you need to set:
ksql.schema.registry.url=https://<Schema Registry endpoint>
ksql.schema.registry.basic.auth.credentials.source=USER_INFO
ksql.schema.registry.basic.auth.user.info=<Schema Registry API Key>:<Schema Registry API Secret>
Exploring KSQL Stream-Stream Joins
Introduction
What can you use stream-stream joins for? Can you use them to join between a stream of orders and stream of related shipments to do useful things? What’s not supported in KSQL, where are the cracks?
Terminate All KSQL Queries
Before you can drop a stream or table that’s populated by a query in KSQL, you have to terminate any queries upon which the object is dependent. Here’s a bit of jq
& xargs
magic to terminate all queries that are currently running
Quick Thoughts on Not Making a Crap Slide Deck
This post is the companion to an earlier one that I wrote about conference abstracts. In the same way that the last one was inspired by reviewing a ton of abstracts and noticing a recurring pattern in my suggestions, so this one comes from reviewing a bunch of slide decks for a forthcoming conference. They all look like good talks, but in several cases these great talks are fighting to get out from underneath the deadening weight of slides.
Herewith follows my highly-opinionated, fairly-subjective, and extremely-terse advice and general suggestions for slide decks. You can also find relating ramblings in this recent post too. My friend and colleague Vik Gamov also wrote a good post on this same topic, and linked to a good video that I’d recommend you watch.
Using httpie with the Kafka REST Proxy
This shows how to use httpie with the Confluent REST Proxy.
Send data
echo '{"records":[{"value":{"foo":"bar"}}]}' | \
http POST http://localhost:8082/topics/jsontest \
Content-Type:application/vnd.kafka.json.v2+json Accept:application/vnd.kafka.v2+json