diff --git a/libraries/fileprocessing/src/main/kotlin/de/itkl/processing/ParallelFlowProcessor.kt b/libraries/fileprocessing/src/main/kotlin/de/itkl/processing/ParallelFlowProcessor.kt index 0564950..0318753 100644 --- a/libraries/fileprocessing/src/main/kotlin/de/itkl/processing/ParallelFlowProcessor.kt +++ b/libraries/fileprocessing/src/main/kotlin/de/itkl/processing/ParallelFlowProcessor.kt @@ -1,25 +1,31 @@ package de.itkl.processing -import kotlinx.coroutines.async -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.channels.consumeEach -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.flow.* -import kotlinx.coroutines.launch -import kotlinx.coroutines.sync.Semaphore -import kotlinx.coroutines.sync.withPermit -import kotlin.coroutines.coroutineContext +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.withContext +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit + +@Suppress("UNCHECKED_CAST") class ParallelFlowProcessor( - private val mapperFn: (T) -> U, - private val concurrencyLimit: Int) { - suspend fun process(flow: Flow): Flow = coroutineScope { - val gate = Semaphore(concurrencyLimit) - flow.map { item -> - async { - gate.withPermit { mapperFn(item) } + private val mapperFn: (T) -> U) { + companion object { + private val workers = Executors.newWorkStealingPool(16) + } + + suspend fun process(flow: Flow): Flow { + return flow { + flow.map { kotlinx.coroutines.Runnable { mapperFn(it) } } + .map { job -> workers.submit(job)} + .toList() + .forEach { future -> emit(future.get() as U) } + withContext(Dispatchers.IO) { + workers.awaitTermination(10000, TimeUnit.DAYS) } } - .map { it.await() } } } \ No newline at end of file diff --git a/libraries/textprocessing/src/main/kotlin/de/itkl/textprocessing/BagOfWords.kt b/libraries/textprocessing/src/main/kotlin/de/itkl/textprocessing/BagOfWords.kt index 8998e9e..37022de 100644 --- a/libraries/textprocessing/src/main/kotlin/de/itkl/textprocessing/BagOfWords.kt +++ b/libraries/textprocessing/src/main/kotlin/de/itkl/textprocessing/BagOfWords.kt @@ -6,7 +6,7 @@ import kotlinx.coroutines.flow.toSet class BagOfWords(private val data: MutableSet = mutableSetOf()) : Iterable { companion object { - suspend fun from(flow: Flow): BagOfWords { + fun from(flow: Sequence): BagOfWords { return BagOfWords(flow.toSet().toMutableSet()) } } diff --git a/libraries/textprocessing/src/main/kotlin/de/itkl/textprocessing/Tokenizer.kt b/libraries/textprocessing/src/main/kotlin/de/itkl/textprocessing/Tokenizer.kt index c7aa1c5..bd609cc 100644 --- a/libraries/textprocessing/src/main/kotlin/de/itkl/textprocessing/Tokenizer.kt +++ b/libraries/textprocessing/src/main/kotlin/de/itkl/textprocessing/Tokenizer.kt @@ -9,36 +9,23 @@ import org.apache.lucene.util.AttributeFactory import java.io.StringReader -class Tokenizer : AutoCloseable{ +class Tokenizer { private val tokenizer by lazy { val factory = AttributeFactory.DEFAULT_ATTRIBUTE_FACTORY val tokenizer = StandardTokenizer(factory) tokenizer -// val reader = ProgressInputStream(file.inputStream(), progressOp) -// tokenizer.setReader(InputStreamReader(reader)) -// tokenizer.reset() -// val attr = tokenizer.addAttribute(CharTermAttribute::class.java) -// return flow { -// while (kotlin.runCatching { tokenizer.incrementToken() }.getOrElse { true } ) { -// emit(attr.toString()) -// } -// }.onCompletion { -// tokenizer.close() -// } } - fun tokenize(input: String): Flow { + fun tokenize(input: String): Sequence { val reader = StringReader(input) tokenizer.setReader(reader) tokenizer.reset() val attr = tokenizer.addAttribute(CharTermAttribute::class.java) - return flow { + return sequence { while (kotlin.runCatching { tokenizer.incrementToken() }.getOrElse { true }) { - emit(attr.toString()) + yield(attr.toString()) } - }.onCompletion { tokenizer.close() } - } - override fun close() { - tokenizer.close() + tokenizer.close() + } } } \ No newline at end of file diff --git a/libraries/tfidf/src/main/kotlin/de/itkl/tfidf/Idf.kt b/libraries/tfidf/src/main/kotlin/de/itkl/tfidf/Idf.kt index 1d358be..65a616f 100644 --- a/libraries/tfidf/src/main/kotlin/de/itkl/tfidf/Idf.kt +++ b/libraries/tfidf/src/main/kotlin/de/itkl/tfidf/Idf.kt @@ -2,6 +2,7 @@ package de.itkl.tfidf import de.itkl.fileprocessing.FileProcessor import de.itkl.fileprocessing.Resource +import de.itkl.processing.ParallelFlowProcessor import de.itkl.textprocessing.* import io.github.oshai.kotlinlogging.KotlinLogging import kotlinx.coroutines.flow.map @@ -22,16 +23,20 @@ class Idf : FileProcessor { Log.info { "Would produce: ${willProduce(resource.path)}" } val resultFile = willProduce(resource.path).toFile() val textFile = TextFile(resource.toFile()) - val bagOfWords = textFile.splitByEmptyLines { } - .take(10) - .map { document -> + val documents = textFile.splitByEmptyLines { } + val bagOfWords = ParallelFlowProcessor, BagOfWords>( + mapperFn = { document -> val tokenizer = Tokenizer() - document.map { line -> + val bagOfWords = document.map { line -> val tokens = tokenizer.tokenize(line) BagOfWords.from(tokens) } .reduce { acc, bagOfWords -> acc.join(bagOfWords) } + bagOfWords } + ).process(documents) + + val histogram = Histogram.fromBagOfWords(bagOfWords) HistogramCsvStorage().save(histogram, resultFile) return resultFile