diff --git a/apps/kafkaAdmin/build.gradle.kts b/apps/kafkaAdmin/build.gradle.kts new file mode 100644 index 0000000..568b1c1 --- /dev/null +++ b/apps/kafkaAdmin/build.gradle.kts @@ -0,0 +1,20 @@ +plugins { + application + kotlin("jvm") + kotlin("plugin.serialization") +} + +repositories { + mavenCentral() +} + +application { + mainClass.set("de.itkl.kafkaAdmin.KafkaAdminApplicationKt.main") +} + +dependencies { + implementation("org.apache.kafka:kafka-clients:3.4.0") + implementation("com.github.ajalt.clikt:clikt:4.4.0") + implementation(libs.bundles.kotlinx) + implementation(libs.bundles.logging) +} diff --git a/apps/kafkaAdmin/src/main/kotlin/de/itkl/kafkaAdmin/KafkaAdmin.kt b/apps/kafkaAdmin/src/main/kotlin/de/itkl/kafkaAdmin/KafkaAdmin.kt new file mode 100644 index 0000000..540503d --- /dev/null +++ b/apps/kafkaAdmin/src/main/kotlin/de/itkl/kafkaAdmin/KafkaAdmin.kt @@ -0,0 +1,57 @@ +package de.itkl.kafkaAdmin + +import de.itkl.kafkaAdmin.KafkaAdmin.KafkaTopic +import kotlinx.coroutines.async +import kotlinx.coroutines.awaitAll +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.future.asDeferred +import org.apache.kafka.clients.admin.Admin +import org.apache.kafka.common.TopicCollection +import org.apache.kafka.common.Uuid + +typealias ListOfTopics = List + +class KafkaAdmin(config: KafkaAdminConfig) { + private val client = Admin.create(config.intoProperties()) + + inner class KafkaTopic( + val uuid: Uuid, + val name: String, + ) { + override fun toString(): String { + return "Topic uuid: $uuid, name: $name" + } + } + + private fun ListOfTopics.toTopicCollection() = TopicCollection.ofTopicNames(map { it.name }) + + suspend fun listTopics(): List { + val listing = client.listTopics() + return listing.listings().toCompletionStage().asDeferred().await() + .map { topicListing -> + KafkaTopic( + uuid = topicListing.topicId(), + name = topicListing.name(), + ) + } + } + + data class AlterTopicResult(val name: String, val error: Throwable?) + + suspend fun deleteTopics(topics: ListOfTopics) = + coroutineScope { + client.deleteTopics(topics.toTopicCollection()) + .topicNameValues() + .map { (name, future) -> + name to future.toCompletionStage().asDeferred() + } + .map { (name, future) -> + async { + val error = runCatching { + future.await() + }.exceptionOrNull() + name to error + } + }.awaitAll() + } +} diff --git a/apps/kafkaAdmin/src/main/kotlin/de/itkl/kafkaAdmin/KafkaAdminApplication.kt b/apps/kafkaAdmin/src/main/kotlin/de/itkl/kafkaAdmin/KafkaAdminApplication.kt new file mode 100644 index 0000000..012e4cf --- /dev/null +++ b/apps/kafkaAdmin/src/main/kotlin/de/itkl/kafkaAdmin/KafkaAdminApplication.kt @@ -0,0 +1,22 @@ +package de.itkl.kafkaAdmin + +fun createClient(): KafkaAdmin { + val config = KafkaAdminConfig(bootstrapServers = "localhost:9092") + val admin = KafkaAdmin(config) + return admin +} + +suspend fun run() { + val admin = createClient() + val topics = admin.listTopics() + val toDelete = topics + .filter { topic -> topic.name.startsWith("training-service-stream") } + admin.deleteTopics(toDelete) + .forEach { (topicName, error) -> + println("$topicName ${error ?: "success"}") + } +} + +suspend fun main() { + run() +} diff --git a/apps/kafkaAdmin/src/main/kotlin/de/itkl/kafkaAdmin/KafkaAdminConfig.kt b/apps/kafkaAdmin/src/main/kotlin/de/itkl/kafkaAdmin/KafkaAdminConfig.kt new file mode 100644 index 0000000..8053ac6 --- /dev/null +++ b/apps/kafkaAdmin/src/main/kotlin/de/itkl/kafkaAdmin/KafkaAdminConfig.kt @@ -0,0 +1,14 @@ +package de.itkl.kafkaAdmin + +import org.apache.kafka.clients.admin.AdminClientConfig +import java.util.* + +data class KafkaAdminConfig( + val bootstrapServers: String, +) { + fun intoProperties(): Properties { + return Properties().apply { + this[AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers + } + } +} diff --git a/apps/kafkaAdmin/src/main/kotlin/de/itkl/kafkaAdmin/KafkaTopic.kt b/apps/kafkaAdmin/src/main/kotlin/de/itkl/kafkaAdmin/KafkaTopic.kt new file mode 100644 index 0000000..ee94571 --- /dev/null +++ b/apps/kafkaAdmin/src/main/kotlin/de/itkl/kafkaAdmin/KafkaTopic.kt @@ -0,0 +1 @@ +package de.itkl.kafkaAdmin diff --git a/settings.gradle.kts b/settings.gradle.kts index b1314ad..be27678 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -16,3 +16,4 @@ fun includeDir(dir: String) { rootProject.name = "xssak" include("app") includeDir("modules") +includeDir("apps")