refactoring into parallelUnordered method

develop
Timo Bryant 2023-12-18 22:55:29 +01:00
parent 13110fa8e5
commit 4cafac4583
7 changed files with 69 additions and 68 deletions

View File

@ -1,37 +0,0 @@
package de.itkl.processing
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
@Suppress("UNCHECKED_CAST")
class ParallelFlowProcessor<T,U>(
private val mapperFn: (T) -> U) {
companion object {
private val workers = Executors.newWorkStealingPool(16)
}
suspend fun process(flow: Flow<T>): Flow<U> {
TODO()
// flow.map { }
// return flow {
// flow.map { kotlinx.coroutines.Runnable {
// val result = mapperFn(it)
// runBlocking { emit(result) }
// } }
// .map { job -> workers.submit(job)}
// .toList()
// .forEach { future -> emit(future.get() as U) }
// withContext(Dispatchers.IO) {
// workers.awaitTermination(10000, TimeUnit.DAYS)
// }
// }
}
}

View File

@ -0,0 +1,41 @@
package de.itkl.processing
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.flow.*
class ParallelUnorderedFlow<U>(
private val producerJob: Job,
private val mapperFlow: Flow<U>
) : Flow<U> {
override suspend fun collect(collector: FlowCollector<U>) {
mapperFlow.collect(collector)
producerJob.join()
}
}
suspend fun <T : Any, U : Any> Flow<T>.parallelUnordered(
scope: CoroutineScope,
numWorkers: Int,
mapperFn: (T) -> U): Flow<U> {
val producerChannel = Channel<T>()
val producersJob = scope.launch(Dispatchers.Default) {
collect {
producerChannel.send(it)}
}
val mapperFlow = channelFlow {
(0..numWorkers).map { consumer ->
launch(Dispatchers.Default) {
producerChannel.consumeEach {
send(mapperFn(it))
}
}
}
}
return ParallelUnorderedFlow(producersJob, mapperFlow)
}

View File

@ -5,5 +5,6 @@ plugins {
dependencies { dependencies {
api("org.apache.lucene:lucene-analysis-common:9.9.0") api("org.apache.lucene:lucene-analysis-common:9.9.0")
implementation("com.github.doyaaaaaken:kotlin-csv-jvm:1.9.2") implementation("com.github.doyaaaaaken:kotlin-csv-jvm:1.9.2")
implementation("com.google.guava:guava:32.1.3-jre")
} }

View File

@ -16,7 +16,8 @@ class BagOfWords(private val data: MutableSet<String> = mutableSetOf()) : Iterab
} }
fun join(bagOfWords: BagOfWords): BagOfWords { fun join(bagOfWords: BagOfWords): BagOfWords {
return BagOfWords(data.toMutableSet().apply { addAll(bagOfWords.data) }) data.addAll(bagOfWords.data)
return this
} }
override fun iterator(): Iterator<String> { override fun iterator(): Iterator<String> {

View File

@ -11,6 +11,13 @@ class Histogram(private val histo: MutableMap<String,UInt> = mutableMapOf()) : I
} }
} }
fun fromBagOfWords(bagOfWords: BagOfWords): Histogram {
val result = Histogram()
bagOfWords.forEach(result::add)
return result
}
suspend fun fromBagOfWords(flow: Flow<BagOfWords>): Histogram { suspend fun fromBagOfWords(flow: Flow<BagOfWords>): Histogram {
val result = Histogram() val result = Histogram()
flow.collect() { value -> flow.collect() { value ->
@ -26,6 +33,13 @@ class Histogram(private val histo: MutableMap<String,UInt> = mutableMapOf()) : I
} }
} }
fun join(other: Histogram): Histogram {
other.forEach { (word, count) ->
histo.merge(word, count) { a,b -> a + b }
}
return this
}
fun add(word: String) { fun add(word: String) {
histo.compute(word) { _, count -> histo.compute(word) { _, count ->
count?.let { it + 1u } ?: 1u count?.let { it + 1u } ?: 1u

View File

@ -7,4 +7,5 @@ dependencies {
api(project(":libraries:fileprocessing")) api(project(":libraries:fileprocessing"))
implementation("com.github.ajalt.mordant:mordant:2.2.0") implementation("com.github.ajalt.mordant:mordant:2.2.0")
implementation("com.github.doyaaaaaken:kotlin-csv-jvm:1.9.2") implementation("com.github.doyaaaaaken:kotlin-csv-jvm:1.9.2")
implementation("com.google.guava:guava:32.1.3-jre")
} }

View File

@ -3,9 +3,8 @@ package de.itkl.tfidf
import com.github.ajalt.mordant.terminal.Terminal import com.github.ajalt.mordant.terminal.Terminal
import de.itkl.fileprocessing.FileProcessor import de.itkl.fileprocessing.FileProcessor
import de.itkl.fileprocessing.Resource import de.itkl.fileprocessing.Resource
import de.itkl.processing.ParallelFlowProcessor import de.itkl.processing.parallelUnordered
import de.itkl.textprocessing.* import de.itkl.textprocessing.*
import de.itkl.tfidf.Idf.Companion.count
import io.github.oshai.kotlinlogging.KotlinLogging import io.github.oshai.kotlinlogging.KotlinLogging
import kotlinx.coroutines.* import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.Channel
@ -22,10 +21,6 @@ private val Log = KotlinLogging.logger { }
class Idf : FileProcessor { class Idf : FileProcessor {
companion object {
val count = AtomicInteger(0)
}
override fun willProduce(path: Path): Path { override fun willProduce(path: Path): Path {
return path.parent.resolve(path.nameWithoutExtension + "-idf.csv") return path.parent.resolve(path.nameWithoutExtension + "-idf.csv")
} }
@ -33,35 +28,20 @@ class Idf : FileProcessor {
override suspend fun process(resource: Resource): File = coroutineScope { override suspend fun process(resource: Resource): File = coroutineScope {
Log.info { "Would produce: ${willProduce(resource.path)}" } Log.info { "Would produce: ${willProduce(resource.path)}" }
val resultFile = willProduce(resource.path).toFile() val resultFile = willProduce(resource.path).toFile()
val channel = Channel<List<String>>(0) val histogram = TextFile(resource.read())
launch {
TextFile(resource.read())
.splitByEmptyLines() .splitByEmptyLines()
.collect { .parallelUnordered(this, 16) { doc ->
channel.send(it) val result = collectWordsOfDocument(doc)
result
} }
} .reduce { acc, other -> acc.join(other)}
val bagOfWords = channelFlow {
(0..16).map {
launch(Dispatchers.Default) {
channel.consumeEach {
val value = collectWordsOfDocument(it)
send(value)
}
}
}
}
val histogram = Histogram.fromBagOfWords(bagOfWords)
HistogramCsvStorage().save(histogram, resultFile) HistogramCsvStorage().save(histogram, resultFile)
resultFile resultFile
} }
private fun collectWordsOfDocument(document: List<String>): BagOfWords { private fun collectWordsOfDocument(document: List<String>): Histogram {
if (document.isEmpty()) { if (document.isEmpty()) {
return BagOfWords() return Histogram()
} }
val tokenizer = Tokenizer() val tokenizer = Tokenizer()
val bagOfWords = document.map { line -> val bagOfWords = document.map { line ->
@ -69,7 +49,7 @@ class Idf : FileProcessor {
BagOfWords.from(tokens) BagOfWords.from(tokens)
} }
.reduce { acc, bagOfWords -> acc.join(bagOfWords) } .reduce { acc, bagOfWords -> acc.join(bagOfWords) }
return bagOfWords return Histogram.fromBagOfWords(bagOfWords)
} }
} }