paralleling finally works

develop
Timo Bryant 2023-12-18 21:59:15 +01:00
parent 71e066fcde
commit 13110fa8e5
4 changed files with 63 additions and 32 deletions

View File

@ -21,6 +21,7 @@ abstract class FileProcessingPipeline {
val resource = FileResource(currentFile)
val progress = ProgressResource(resource, progressBarFactory)
processor.process(progress)
Log.info { "File created: $target" }
}
currentFile = target.toFile()
}

View File

@ -19,17 +19,19 @@ class ParallelFlowProcessor<T,U>(
}
suspend fun process(flow: Flow<T>): Flow<U> {
return flow {
flow.map { kotlinx.coroutines.Runnable {
val result = mapperFn(it)
runBlocking { emit(result) }
} }
.map { job -> workers.submit(job)}
.toList()
.forEach { future -> emit(future.get() as U) }
withContext(Dispatchers.IO) {
workers.awaitTermination(10000, TimeUnit.DAYS)
}
}
TODO()
// flow.map { }
// return flow {
// flow.map { kotlinx.coroutines.Runnable {
// val result = mapperFn(it)
// runBlocking { emit(result) }
// } }
// .map { job -> workers.submit(job)}
// .toList()
// .forEach { future -> emit(future.get() as U) }
// withContext(Dispatchers.IO) {
// workers.awaitTermination(10000, TimeUnit.DAYS)
// }
// }
}
}

View File

@ -13,8 +13,7 @@ class Histogram(private val histo: MutableMap<String,UInt> = mutableMapOf()) : I
suspend fun fromBagOfWords(flow: Flow<BagOfWords>): Histogram {
val result = Histogram()
flow.collectIndexed { index, value ->
println(index)
flow.collect() { value ->
value.forEach(result::add)
}
return result

View File

@ -5,42 +5,71 @@ import de.itkl.fileprocessing.FileProcessor
import de.itkl.fileprocessing.Resource
import de.itkl.processing.ParallelFlowProcessor
import de.itkl.textprocessing.*
import de.itkl.tfidf.Idf.Companion.count
import io.github.oshai.kotlinlogging.KotlinLogging
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.reduce
import kotlinx.coroutines.flow.take
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withPermit
import java.io.File
import java.nio.file.Path
import java.util.concurrent.atomic.AtomicInteger
import kotlin.io.path.nameWithoutExtension
private val Log = KotlinLogging.logger { }
class Idf : FileProcessor {
companion object {
val count = AtomicInteger(0)
}
override fun willProduce(path: Path): Path {
return path.parent.resolve(path.nameWithoutExtension + "-idf.csv")
}
override suspend fun process(resource: Resource): File {
override suspend fun process(resource: Resource): File = coroutineScope {
Log.info { "Would produce: ${willProduce(resource.path)}" }
val resultFile = willProduce(resource.path).toFile()
val textFile = TextFile(resource.read())
val documents = textFile.splitByEmptyLines()
val bagOfWords = ParallelFlowProcessor<List<String>, BagOfWords>(
mapperFn = { document ->
val channel = Channel<List<String>>(0)
launch {
TextFile(resource.read())
.splitByEmptyLines()
.collect {
channel.send(it)
}
}
val bagOfWords = channelFlow {
(0..16).map {
launch(Dispatchers.Default) {
channel.consumeEach {
val value = collectWordsOfDocument(it)
send(value)
}
}
}
}
val histogram = Histogram.fromBagOfWords(bagOfWords)
HistogramCsvStorage().save(histogram, resultFile)
resultFile
}
private fun collectWordsOfDocument(document: List<String>): BagOfWords {
if (document.isEmpty()) {
return BagOfWords()
}
val tokenizer = Tokenizer()
val bagOfWords = document.map { line ->
val tokens = tokenizer.tokenize(line)
BagOfWords.from(tokens)
}
.reduce { acc, bagOfWords -> acc.join(bagOfWords) }
bagOfWords
}
).process(documents)
val histogram = Histogram.fromBagOfWords(bagOfWords)
HistogramCsvStorage().save(histogram, resultFile)
return resultFile
return bagOfWords
}
}