diff --git a/libraries/fileprocessing/src/main/kotlin/de/itkl/fileprocessing/ProcessingPipeline.kt b/libraries/fileprocessing/src/main/kotlin/de/itkl/fileprocessing/ProcessingPipeline.kt index ad8fe2a..36039a6 100644 --- a/libraries/fileprocessing/src/main/kotlin/de/itkl/fileprocessing/ProcessingPipeline.kt +++ b/libraries/fileprocessing/src/main/kotlin/de/itkl/fileprocessing/ProcessingPipeline.kt @@ -21,6 +21,7 @@ abstract class FileProcessingPipeline { val resource = FileResource(currentFile) val progress = ProgressResource(resource, progressBarFactory) processor.process(progress) + Log.info { "File created: $target" } } currentFile = target.toFile() } diff --git a/libraries/fileprocessing/src/main/kotlin/de/itkl/processing/ParallelFlowProcessor.kt b/libraries/fileprocessing/src/main/kotlin/de/itkl/processing/ParallelFlowProcessor.kt index 284304c..d63e02a 100644 --- a/libraries/fileprocessing/src/main/kotlin/de/itkl/processing/ParallelFlowProcessor.kt +++ b/libraries/fileprocessing/src/main/kotlin/de/itkl/processing/ParallelFlowProcessor.kt @@ -19,17 +19,19 @@ class ParallelFlowProcessor( } suspend fun process(flow: Flow): Flow { - return flow { - flow.map { kotlinx.coroutines.Runnable { - val result = mapperFn(it) - runBlocking { emit(result) } - } } - .map { job -> workers.submit(job)} - .toList() - .forEach { future -> emit(future.get() as U) } - withContext(Dispatchers.IO) { - workers.awaitTermination(10000, TimeUnit.DAYS) - } - } + TODO() +// flow.map { } +// return flow { +// flow.map { kotlinx.coroutines.Runnable { +// val result = mapperFn(it) +// runBlocking { emit(result) } +// } } +// .map { job -> workers.submit(job)} +// .toList() +// .forEach { future -> emit(future.get() as U) } +// withContext(Dispatchers.IO) { +// workers.awaitTermination(10000, TimeUnit.DAYS) +// } +// } } } \ No newline at end of file diff --git a/libraries/textprocessing/src/main/kotlin/de/itkl/textprocessing/Histogram.kt b/libraries/textprocessing/src/main/kotlin/de/itkl/textprocessing/Histogram.kt index 87ee5de..3ba3689 100644 --- a/libraries/textprocessing/src/main/kotlin/de/itkl/textprocessing/Histogram.kt +++ b/libraries/textprocessing/src/main/kotlin/de/itkl/textprocessing/Histogram.kt @@ -13,8 +13,7 @@ class Histogram(private val histo: MutableMap = mutableMapOf()) : I suspend fun fromBagOfWords(flow: Flow): Histogram { val result = Histogram() - flow.collectIndexed { index, value -> - println(index) + flow.collect() { value -> value.forEach(result::add) } return result diff --git a/libraries/tfidf/src/main/kotlin/de/itkl/tfidf/Idf.kt b/libraries/tfidf/src/main/kotlin/de/itkl/tfidf/Idf.kt index 82b4b8b..e740237 100644 --- a/libraries/tfidf/src/main/kotlin/de/itkl/tfidf/Idf.kt +++ b/libraries/tfidf/src/main/kotlin/de/itkl/tfidf/Idf.kt @@ -5,42 +5,71 @@ import de.itkl.fileprocessing.FileProcessor import de.itkl.fileprocessing.Resource import de.itkl.processing.ParallelFlowProcessor import de.itkl.textprocessing.* +import de.itkl.tfidf.Idf.Companion.count import io.github.oshai.kotlinlogging.KotlinLogging -import kotlinx.coroutines.flow.map -import kotlinx.coroutines.flow.reduce -import kotlinx.coroutines.flow.take +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.consumeEach +import kotlinx.coroutines.flow.* +import kotlinx.coroutines.sync.Semaphore +import kotlinx.coroutines.sync.withPermit import java.io.File import java.nio.file.Path +import java.util.concurrent.atomic.AtomicInteger import kotlin.io.path.nameWithoutExtension private val Log = KotlinLogging.logger { } class Idf : FileProcessor { + + companion object { + val count = AtomicInteger(0) + } + override fun willProduce(path: Path): Path { return path.parent.resolve(path.nameWithoutExtension + "-idf.csv") } - override suspend fun process(resource: Resource): File { + override suspend fun process(resource: Resource): File = coroutineScope { Log.info { "Would produce: ${willProduce(resource.path)}" } val resultFile = willProduce(resource.path).toFile() - val textFile = TextFile(resource.read()) - val documents = textFile.splitByEmptyLines() - val bagOfWords = ParallelFlowProcessor, BagOfWords>( - mapperFn = { document -> - val tokenizer = Tokenizer() - val bagOfWords = document.map { line -> - val tokens = tokenizer.tokenize(line) - BagOfWords.from(tokens) - } - .reduce { acc, bagOfWords -> acc.join(bagOfWords) } - bagOfWords - } - ).process(documents) + val channel = Channel>(0) + launch { + TextFile(resource.read()) + .splitByEmptyLines() + .collect { + channel.send(it) + } + } + + val bagOfWords = channelFlow { + (0..16).map { + launch(Dispatchers.Default) { + channel.consumeEach { + val value = collectWordsOfDocument(it) + send(value) + } + } + } + } val histogram = Histogram.fromBagOfWords(bagOfWords) HistogramCsvStorage().save(histogram, resultFile) - return resultFile + resultFile + } + + private fun collectWordsOfDocument(document: List): BagOfWords { + if (document.isEmpty()) { + return BagOfWords() + } + val tokenizer = Tokenizer() + val bagOfWords = document.map { line -> + val tokens = tokenizer.tokenize(line) + BagOfWords.from(tokens) + } + .reduce { acc, bagOfWords -> acc.join(bagOfWords) } + return bagOfWords } } \ No newline at end of file