add kafka client

main
Timo Bryant 2024-04-29 09:57:55 +02:00
parent 629728f865
commit 8a9732eff9
6 changed files with 115 additions and 0 deletions

View File

@ -0,0 +1,20 @@
plugins {
application
kotlin("jvm")
kotlin("plugin.serialization")
}
repositories {
mavenCentral()
}
application {
mainClass.set("de.itkl.kafkaAdmin.KafkaAdminApplicationKt.main")
}
dependencies {
implementation("org.apache.kafka:kafka-clients:3.4.0")
implementation("com.github.ajalt.clikt:clikt:4.4.0")
implementation(libs.bundles.kotlinx)
implementation(libs.bundles.logging)
}

View File

@ -0,0 +1,57 @@
package de.itkl.kafkaAdmin
import de.itkl.kafkaAdmin.KafkaAdmin.KafkaTopic
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.future.asDeferred
import org.apache.kafka.clients.admin.Admin
import org.apache.kafka.common.TopicCollection
import org.apache.kafka.common.Uuid
typealias ListOfTopics = List<KafkaTopic>
class KafkaAdmin(config: KafkaAdminConfig) {
private val client = Admin.create(config.intoProperties())
inner class KafkaTopic(
val uuid: Uuid,
val name: String,
) {
override fun toString(): String {
return "Topic uuid: $uuid, name: $name"
}
}
private fun ListOfTopics.toTopicCollection() = TopicCollection.ofTopicNames(map { it.name })
suspend fun listTopics(): List<KafkaTopic> {
val listing = client.listTopics()
return listing.listings().toCompletionStage().asDeferred().await()
.map { topicListing ->
KafkaTopic(
uuid = topicListing.topicId(),
name = topicListing.name(),
)
}
}
data class AlterTopicResult(val name: String, val error: Throwable?)
suspend fun deleteTopics(topics: ListOfTopics) =
coroutineScope {
client.deleteTopics(topics.toTopicCollection())
.topicNameValues()
.map { (name, future) ->
name to future.toCompletionStage().asDeferred()
}
.map { (name, future) ->
async {
val error = runCatching {
future.await()
}.exceptionOrNull()
name to error
}
}.awaitAll()
}
}

View File

@ -0,0 +1,22 @@
package de.itkl.kafkaAdmin
fun createClient(): KafkaAdmin {
val config = KafkaAdminConfig(bootstrapServers = "localhost:9092")
val admin = KafkaAdmin(config)
return admin
}
suspend fun run() {
val admin = createClient()
val topics = admin.listTopics()
val toDelete = topics
.filter { topic -> topic.name.startsWith("training-service-stream") }
admin.deleteTopics(toDelete)
.forEach { (topicName, error) ->
println("$topicName ${error ?: "success"}")
}
}
suspend fun main() {
run()
}

View File

@ -0,0 +1,14 @@
package de.itkl.kafkaAdmin
import org.apache.kafka.clients.admin.AdminClientConfig
import java.util.*
data class KafkaAdminConfig(
val bootstrapServers: String,
) {
fun intoProperties(): Properties {
return Properties().apply {
this[AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
}
}
}

View File

@ -0,0 +1 @@
package de.itkl.kafkaAdmin

View File

@ -16,3 +16,4 @@ fun includeDir(dir: String) {
rootProject.name = "xssak"
include("app")
includeDir("modules")
includeDir("apps")