From 4cafac458385e7194d4b1b5165b4026cbb9683ea Mon Sep 17 00:00:00 2001 From: Timo Bryant Date: Mon, 18 Dec 2023 22:55:29 +0100 Subject: [PATCH] refactoring into parallelUnordered method --- .../itkl/processing/ParallelFlowProcessor.kt | 37 ----------------- .../processing/UnorderedParallelFlowMap.kt | 41 +++++++++++++++++++ libraries/textprocessing/build.gradle.kts | 1 + .../de/itkl/textprocessing/BagOfWords.kt | 3 +- .../de/itkl/textprocessing/Histogram.kt | 14 +++++++ libraries/tfidf/build.gradle.kts | 1 + .../src/main/kotlin/de/itkl/tfidf/Idf.kt | 40 +++++------------- 7 files changed, 69 insertions(+), 68 deletions(-) delete mode 100644 libraries/fileprocessing/src/main/kotlin/de/itkl/processing/ParallelFlowProcessor.kt create mode 100644 libraries/fileprocessing/src/main/kotlin/de/itkl/processing/UnorderedParallelFlowMap.kt diff --git a/libraries/fileprocessing/src/main/kotlin/de/itkl/processing/ParallelFlowProcessor.kt b/libraries/fileprocessing/src/main/kotlin/de/itkl/processing/ParallelFlowProcessor.kt deleted file mode 100644 index d63e02a..0000000 --- a/libraries/fileprocessing/src/main/kotlin/de/itkl/processing/ParallelFlowProcessor.kt +++ /dev/null @@ -1,37 +0,0 @@ -package de.itkl.processing - -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.flow -import kotlinx.coroutines.flow.map -import kotlinx.coroutines.flow.toList -import kotlinx.coroutines.runBlocking -import kotlinx.coroutines.withContext -import java.util.concurrent.Executors -import java.util.concurrent.TimeUnit - - -@Suppress("UNCHECKED_CAST") -class ParallelFlowProcessor( - private val mapperFn: (T) -> U) { - companion object { - private val workers = Executors.newWorkStealingPool(16) - } - - suspend fun process(flow: Flow): Flow { - TODO() -// flow.map { } -// return flow { -// flow.map { kotlinx.coroutines.Runnable { -// val result = mapperFn(it) -// runBlocking { emit(result) } -// } } -// .map { job -> workers.submit(job)} -// .toList() -// .forEach { future -> emit(future.get() as U) } -// withContext(Dispatchers.IO) { -// workers.awaitTermination(10000, TimeUnit.DAYS) -// } -// } - } -} \ No newline at end of file diff --git a/libraries/fileprocessing/src/main/kotlin/de/itkl/processing/UnorderedParallelFlowMap.kt b/libraries/fileprocessing/src/main/kotlin/de/itkl/processing/UnorderedParallelFlowMap.kt new file mode 100644 index 0000000..e14a7ba --- /dev/null +++ b/libraries/fileprocessing/src/main/kotlin/de/itkl/processing/UnorderedParallelFlowMap.kt @@ -0,0 +1,41 @@ +package de.itkl.processing + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.consumeEach +import kotlinx.coroutines.flow.* + +class ParallelUnorderedFlow( + private val producerJob: Job, + private val mapperFlow: Flow +) : Flow { + override suspend fun collect(collector: FlowCollector) { + mapperFlow.collect(collector) + producerJob.join() + } + +} + +suspend fun Flow.parallelUnordered( + scope: CoroutineScope, + numWorkers: Int, + mapperFn: (T) -> U): Flow { + + val producerChannel = Channel() + + val producersJob = scope.launch(Dispatchers.Default) { + collect { + producerChannel.send(it)} + } + + val mapperFlow = channelFlow { + (0..numWorkers).map { consumer -> + launch(Dispatchers.Default) { + producerChannel.consumeEach { + send(mapperFn(it)) + } + } + } + } + return ParallelUnorderedFlow(producersJob, mapperFlow) +} \ No newline at end of file diff --git a/libraries/textprocessing/build.gradle.kts b/libraries/textprocessing/build.gradle.kts index 3983b7d..e14ed29 100644 --- a/libraries/textprocessing/build.gradle.kts +++ b/libraries/textprocessing/build.gradle.kts @@ -5,5 +5,6 @@ plugins { dependencies { api("org.apache.lucene:lucene-analysis-common:9.9.0") implementation("com.github.doyaaaaaken:kotlin-csv-jvm:1.9.2") + implementation("com.google.guava:guava:32.1.3-jre") } diff --git a/libraries/textprocessing/src/main/kotlin/de/itkl/textprocessing/BagOfWords.kt b/libraries/textprocessing/src/main/kotlin/de/itkl/textprocessing/BagOfWords.kt index 37022de..6ece780 100644 --- a/libraries/textprocessing/src/main/kotlin/de/itkl/textprocessing/BagOfWords.kt +++ b/libraries/textprocessing/src/main/kotlin/de/itkl/textprocessing/BagOfWords.kt @@ -16,7 +16,8 @@ class BagOfWords(private val data: MutableSet = mutableSetOf()) : Iterab } fun join(bagOfWords: BagOfWords): BagOfWords { - return BagOfWords(data.toMutableSet().apply { addAll(bagOfWords.data) }) + data.addAll(bagOfWords.data) + return this } override fun iterator(): Iterator { diff --git a/libraries/textprocessing/src/main/kotlin/de/itkl/textprocessing/Histogram.kt b/libraries/textprocessing/src/main/kotlin/de/itkl/textprocessing/Histogram.kt index 3ba3689..0df3a78 100644 --- a/libraries/textprocessing/src/main/kotlin/de/itkl/textprocessing/Histogram.kt +++ b/libraries/textprocessing/src/main/kotlin/de/itkl/textprocessing/Histogram.kt @@ -11,6 +11,13 @@ class Histogram(private val histo: MutableMap = mutableMapOf()) : I } } + fun fromBagOfWords(bagOfWords: BagOfWords): Histogram { + val result = Histogram() + bagOfWords.forEach(result::add) + return result + } + + suspend fun fromBagOfWords(flow: Flow): Histogram { val result = Histogram() flow.collect() { value -> @@ -26,6 +33,13 @@ class Histogram(private val histo: MutableMap = mutableMapOf()) : I } } + fun join(other: Histogram): Histogram { + other.forEach { (word, count) -> + histo.merge(word, count) { a,b -> a + b } + } + return this + } + fun add(word: String) { histo.compute(word) { _, count -> count?.let { it + 1u } ?: 1u diff --git a/libraries/tfidf/build.gradle.kts b/libraries/tfidf/build.gradle.kts index de75945..5231d56 100644 --- a/libraries/tfidf/build.gradle.kts +++ b/libraries/tfidf/build.gradle.kts @@ -7,4 +7,5 @@ dependencies { api(project(":libraries:fileprocessing")) implementation("com.github.ajalt.mordant:mordant:2.2.0") implementation("com.github.doyaaaaaken:kotlin-csv-jvm:1.9.2") + implementation("com.google.guava:guava:32.1.3-jre") } diff --git a/libraries/tfidf/src/main/kotlin/de/itkl/tfidf/Idf.kt b/libraries/tfidf/src/main/kotlin/de/itkl/tfidf/Idf.kt index e740237..4dc52d7 100644 --- a/libraries/tfidf/src/main/kotlin/de/itkl/tfidf/Idf.kt +++ b/libraries/tfidf/src/main/kotlin/de/itkl/tfidf/Idf.kt @@ -3,9 +3,8 @@ package de.itkl.tfidf import com.github.ajalt.mordant.terminal.Terminal import de.itkl.fileprocessing.FileProcessor import de.itkl.fileprocessing.Resource -import de.itkl.processing.ParallelFlowProcessor +import de.itkl.processing.parallelUnordered import de.itkl.textprocessing.* -import de.itkl.tfidf.Idf.Companion.count import io.github.oshai.kotlinlogging.KotlinLogging import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel @@ -22,10 +21,6 @@ private val Log = KotlinLogging.logger { } class Idf : FileProcessor { - companion object { - val count = AtomicInteger(0) - } - override fun willProduce(path: Path): Path { return path.parent.resolve(path.nameWithoutExtension + "-idf.csv") } @@ -33,35 +28,20 @@ class Idf : FileProcessor { override suspend fun process(resource: Resource): File = coroutineScope { Log.info { "Would produce: ${willProduce(resource.path)}" } val resultFile = willProduce(resource.path).toFile() - val channel = Channel>(0) - - launch { - TextFile(resource.read()) - .splitByEmptyLines() - .collect { - channel.send(it) - } - } - - val bagOfWords = channelFlow { - (0..16).map { - launch(Dispatchers.Default) { - channel.consumeEach { - val value = collectWordsOfDocument(it) - send(value) - } - } + val histogram = TextFile(resource.read()) + .splitByEmptyLines() + .parallelUnordered(this, 16) { doc -> + val result = collectWordsOfDocument(doc) + result } - } - - val histogram = Histogram.fromBagOfWords(bagOfWords) + .reduce { acc, other -> acc.join(other)} HistogramCsvStorage().save(histogram, resultFile) resultFile } - private fun collectWordsOfDocument(document: List): BagOfWords { + private fun collectWordsOfDocument(document: List): Histogram { if (document.isEmpty()) { - return BagOfWords() + return Histogram() } val tokenizer = Tokenizer() val bagOfWords = document.map { line -> @@ -69,7 +49,7 @@ class Idf : FileProcessor { BagOfWords.from(tokens) } .reduce { acc, bagOfWords -> acc.join(bagOfWords) } - return bagOfWords + return Histogram.fromBagOfWords(bagOfWords) } } \ No newline at end of file