diff --git a/app/src/main/kotlin/docthor/app/App.kt b/app/src/main/kotlin/docthor/app/App.kt index 8426865..388d7b7 100644 --- a/app/src/main/kotlin/docthor/app/App.kt +++ b/app/src/main/kotlin/docthor/app/App.kt @@ -6,10 +6,12 @@ import com.github.ajalt.clikt.parameters.options.option import com.github.ajalt.clikt.parameters.options.required import com.github.ajalt.clikt.parameters.types.enum import com.github.ajalt.clikt.parameters.types.file +import de.itkl.textprocessing.TextFile import de.itkl.tfidf.Language import de.itkl.tfidf.TfIdf +import de.itkl.tfidf.TfIdfPipeline +import kotlinx.coroutines.flow.take import kotlinx.coroutines.runBlocking -import java.io.File class ComputeTf : CliktCommand() { private val corpus by option(help = "corpus") @@ -20,13 +22,17 @@ class ComputeTf : CliktCommand() { .required() override fun run() = runBlocking { - val tfIdf = TfIdf() - val histogram = tfIdf.computeTf( - corpus, - language - ) - tfIdf.normalizeTf(histogram, corpus.toPath().parent.resolve("${corpus.nameWithoutExtension}-tf.csv").toFile()) - + TfIdfPipeline(language = Language.DE) + .input(corpus) +// TextFile(corpus).splitByEmptyLines() +// .take(10) +// .collect { println(it) } +// val tfIdf = TfIdf() +// val histogram = tfIdf.computeTf( +// corpus, +// language +// ) +// val tf = tfIdf.normalizeTf(histogram, corpus.toPath().parent.resolve("${corpus.nameWithoutExtension}-tf.csv").toFile()) } } diff --git a/libraries/fileprocessing/build.gradle.kts b/libraries/fileprocessing/build.gradle.kts new file mode 100644 index 0000000..c74f63c --- /dev/null +++ b/libraries/fileprocessing/build.gradle.kts @@ -0,0 +1,7 @@ +plugins { + id("docthor.kotlin-library-conventions") +} + +dependencies { + implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.3") +} diff --git a/libraries/fileprocessing/src/main/kotlin/de/itkl/fileprocessing/FileProcessor.kt b/libraries/fileprocessing/src/main/kotlin/de/itkl/fileprocessing/FileProcessor.kt new file mode 100644 index 0000000..3f156a0 --- /dev/null +++ b/libraries/fileprocessing/src/main/kotlin/de/itkl/fileprocessing/FileProcessor.kt @@ -0,0 +1,10 @@ +package de.itkl.fileprocessing + +import java.io.File +import java.io.InputStream +import java.nio.file.Path + +interface FileProcessor { + fun willProduce(path: Path): Path + suspend fun process(resource: Resource): File +} \ No newline at end of file diff --git a/libraries/fileprocessing/src/main/kotlin/de/itkl/fileprocessing/ProcessingPipeline.kt b/libraries/fileprocessing/src/main/kotlin/de/itkl/fileprocessing/ProcessingPipeline.kt new file mode 100644 index 0000000..04a7872 --- /dev/null +++ b/libraries/fileprocessing/src/main/kotlin/de/itkl/fileprocessing/ProcessingPipeline.kt @@ -0,0 +1,25 @@ +package de.itkl.fileprocessing + +import io.github.oshai.kotlinlogging.KotlinLogging +import java.io.File +import kotlin.io.path.exists + +private val Log = KotlinLogging.logger { } + +abstract class FileProcessingPipeline { + + protected abstract val fileProcessor: List + suspend fun input(file: File) { + var currentFile = file + fileProcessor.forEach { processor -> + val target = processor.willProduce(currentFile.toPath()) + if(target.exists()) { + Log.info { "$target exists. Skipping" } + } else { + Log.info { "$target does not exists. Creating" } + processor.process(FileResource(currentFile)) + } + currentFile = target.toFile() + } + } +} \ No newline at end of file diff --git a/libraries/fileprocessing/src/main/kotlin/de/itkl/fileprocessing/ProgressInputStream.kt b/libraries/fileprocessing/src/main/kotlin/de/itkl/fileprocessing/ProgressInputStream.kt new file mode 100644 index 0000000..fceb3bb --- /dev/null +++ b/libraries/fileprocessing/src/main/kotlin/de/itkl/fileprocessing/ProgressInputStream.kt @@ -0,0 +1,39 @@ +package de.itkl.fileprocessing + +import java.io.InputStream + +/** + * Represents an input stream that tracks the progress of reading from an underlying input stream. + * + * @property inputStream The underlying input stream to read from. + * @property updateOp The operation to be executed when the number of bytes read changes. + * @property bytesRead The number of bytes read from the input stream. + */ +class ProgressInputStream( + private val inputStream: InputStream, + private val updateOp: (Long) -> Unit) : InputStream() { + @Volatile + var bytesRead: Long = 0 + private set(value) { + field = value + updateOp(value) + } + + override fun read(): Int { + val byte = inputStream.read() + if (byte != -1) { + bytesRead++ + } + return byte + } + override fun read(b: ByteArray, off: Int, len: Int): Int { + val bytesRead = inputStream.read(b, off, len) + if (bytesRead != -1) { + this.bytesRead += bytesRead + } + return bytesRead + } + override fun read(b: ByteArray): Int { + return this.read(b, 0, b.size) + } +} \ No newline at end of file diff --git a/libraries/fileprocessing/src/main/kotlin/de/itkl/fileprocessing/Resource.kt b/libraries/fileprocessing/src/main/kotlin/de/itkl/fileprocessing/Resource.kt new file mode 100644 index 0000000..4955bdd --- /dev/null +++ b/libraries/fileprocessing/src/main/kotlin/de/itkl/fileprocessing/Resource.kt @@ -0,0 +1,34 @@ +package de.itkl.fileprocessing + +import java.io.File +import java.io.InputStream +import java.nio.file.Files +import java.nio.file.Path + +interface Resource { + val path: Path + val size: Long + fun toFile(): File = path.toFile() + fun read(): InputStream +} + +class ProgressResource( + private val resource: Resource, + private val progressOpSupplier: () -> (Long) -> Unit +) : Resource by resource +{ + override fun read(): InputStream { + return ProgressInputStream( + read(), + progressOpSupplier() + ) + } +} + +class FileResource(override val path: Path) : Resource { + constructor(file: File): this(file.toPath()) + override val size: Long by lazy { path.toFile().length() } + override fun read(): InputStream { + return Files.newInputStream(path) + } +} \ No newline at end of file diff --git a/libraries/fileprocessing/src/main/kotlin/de/itkl/processing/ParallelFlowProcessor.kt b/libraries/fileprocessing/src/main/kotlin/de/itkl/processing/ParallelFlowProcessor.kt new file mode 100644 index 0000000..0564950 --- /dev/null +++ b/libraries/fileprocessing/src/main/kotlin/de/itkl/processing/ParallelFlowProcessor.kt @@ -0,0 +1,25 @@ +package de.itkl.processing + +import kotlinx.coroutines.async +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.consumeEach +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.flow.* +import kotlinx.coroutines.launch +import kotlinx.coroutines.sync.Semaphore +import kotlinx.coroutines.sync.withPermit +import kotlin.coroutines.coroutineContext + +class ParallelFlowProcessor( + private val mapperFn: (T) -> U, + private val concurrencyLimit: Int) { + suspend fun process(flow: Flow): Flow = coroutineScope { + val gate = Semaphore(concurrencyLimit) + flow.map { item -> + async { + gate.withPermit { mapperFn(item) } + } + } + .map { it.await() } + } +} \ No newline at end of file diff --git a/libraries/textprocessing/src/main/kotlin/de/itkl/textprocessing/BagOfWords.kt b/libraries/textprocessing/src/main/kotlin/de/itkl/textprocessing/BagOfWords.kt new file mode 100644 index 0000000..8998e9e --- /dev/null +++ b/libraries/textprocessing/src/main/kotlin/de/itkl/textprocessing/BagOfWords.kt @@ -0,0 +1,33 @@ +package de.itkl.textprocessing + +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.toSet + +class BagOfWords(private val data: MutableSet = mutableSetOf()) : Iterable { + + companion object { + suspend fun from(flow: Flow): BagOfWords { + return BagOfWords(flow.toSet().toMutableSet()) + } + } + + fun add(word: String) { + data.add(word) + } + + fun join(bagOfWords: BagOfWords): BagOfWords { + return BagOfWords(data.toMutableSet().apply { addAll(bagOfWords.data) }) + } + + override fun iterator(): Iterator { + return iterator { + data.forEach { yield(it) } + } + + } + + override fun toString(): String { + return data.joinToString(",") + } + +} \ No newline at end of file diff --git a/libraries/textprocessing/src/main/kotlin/de/itkl/textprocessing/DocumentFrequency.kt b/libraries/textprocessing/src/main/kotlin/de/itkl/textprocessing/DocumentFrequency.kt new file mode 100644 index 0000000..91bd7b5 --- /dev/null +++ b/libraries/textprocessing/src/main/kotlin/de/itkl/textprocessing/DocumentFrequency.kt @@ -0,0 +1,4 @@ +package de.itkl.textprocessing + +class DocumentFrequency { +} \ No newline at end of file diff --git a/libraries/textprocessing/src/main/kotlin/de/itkl/textprocessing/Histogram.kt b/libraries/textprocessing/src/main/kotlin/de/itkl/textprocessing/Histogram.kt index d3721c4..54c3c67 100644 --- a/libraries/textprocessing/src/main/kotlin/de/itkl/textprocessing/Histogram.kt +++ b/libraries/textprocessing/src/main/kotlin/de/itkl/textprocessing/Histogram.kt @@ -1,6 +1,6 @@ package de.itkl.textprocessing -import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.* class Histogram(private val histo: MutableMap = mutableMapOf()) : Iterable>{ @@ -11,6 +11,14 @@ class Histogram(private val histo: MutableMap = mutableMapOf()) : I } } + suspend fun fromBagOfWords(flow: Flow): Histogram { + val result = Histogram() + flow.collect { bagOfWords -> + bagOfWords.forEach(result::add) + } + return result + } + fun from(sequence: Sequence>): Histogram { val histo = sequence.associate { map -> map["word"]!! to map["count"]!!.toUInt() } .toMutableMap() diff --git a/libraries/textprocessing/src/main/kotlin/de/itkl/textprocessing/TextFile.kt b/libraries/textprocessing/src/main/kotlin/de/itkl/textprocessing/TextFile.kt index 54aab4f..082b99d 100644 --- a/libraries/textprocessing/src/main/kotlin/de/itkl/textprocessing/TextFile.kt +++ b/libraries/textprocessing/src/main/kotlin/de/itkl/textprocessing/TextFile.kt @@ -11,6 +11,23 @@ import java.io.InputStreamReader class TextFile(val file: File) { + + fun splitByEmptyLines(progressOp: (read: Long) -> Unit = {}): Flow> { + val reader = InputStreamReader(ProgressInputStream(file.inputStream(), progressOp)) + var list = mutableListOf() + return flow { + reader.useLines { lines -> + lines.forEach { line -> + if(line.isEmpty()) { + emit(list) + list = mutableListOf() + } else { + list.add(line) + } + } + } + } + } fun words(progressOp: (read: Long) -> Unit = {}): Flow { val factory = AttributeFactory.DEFAULT_ATTRIBUTE_FACTORY val tokenizer = StandardTokenizer(factory) diff --git a/libraries/textprocessing/src/main/kotlin/de/itkl/textprocessing/Tokenizer.kt b/libraries/textprocessing/src/main/kotlin/de/itkl/textprocessing/Tokenizer.kt new file mode 100644 index 0000000..c7aa1c5 --- /dev/null +++ b/libraries/textprocessing/src/main/kotlin/de/itkl/textprocessing/Tokenizer.kt @@ -0,0 +1,44 @@ +package de.itkl.textprocessing + +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.onCompletion +import org.apache.lucene.analysis.standard.StandardTokenizer +import org.apache.lucene.analysis.tokenattributes.CharTermAttribute +import org.apache.lucene.util.AttributeFactory +import java.io.StringReader + + +class Tokenizer : AutoCloseable{ + + private val tokenizer by lazy { + val factory = AttributeFactory.DEFAULT_ATTRIBUTE_FACTORY + val tokenizer = StandardTokenizer(factory) + tokenizer +// val reader = ProgressInputStream(file.inputStream(), progressOp) +// tokenizer.setReader(InputStreamReader(reader)) +// tokenizer.reset() +// val attr = tokenizer.addAttribute(CharTermAttribute::class.java) +// return flow { +// while (kotlin.runCatching { tokenizer.incrementToken() }.getOrElse { true } ) { +// emit(attr.toString()) +// } +// }.onCompletion { +// tokenizer.close() +// } + } + fun tokenize(input: String): Flow { + val reader = StringReader(input) + tokenizer.setReader(reader) + tokenizer.reset() + val attr = tokenizer.addAttribute(CharTermAttribute::class.java) + return flow { + while (kotlin.runCatching { tokenizer.incrementToken() }.getOrElse { true }) { + emit(attr.toString()) + } + }.onCompletion { tokenizer.close() } + } + override fun close() { + tokenizer.close() + } +} \ No newline at end of file diff --git a/libraries/tfidf/build.gradle.kts b/libraries/tfidf/build.gradle.kts index 738ede3..de75945 100644 --- a/libraries/tfidf/build.gradle.kts +++ b/libraries/tfidf/build.gradle.kts @@ -4,6 +4,7 @@ plugins { dependencies { api(project(":libraries:textprocessing")) + api(project(":libraries:fileprocessing")) implementation("com.github.ajalt.mordant:mordant:2.2.0") implementation("com.github.doyaaaaaken:kotlin-csv-jvm:1.9.2") } diff --git a/libraries/tfidf/src/main/kotlin/de/itkl/tfidf/Idf.kt b/libraries/tfidf/src/main/kotlin/de/itkl/tfidf/Idf.kt new file mode 100644 index 0000000..1d358be --- /dev/null +++ b/libraries/tfidf/src/main/kotlin/de/itkl/tfidf/Idf.kt @@ -0,0 +1,40 @@ +package de.itkl.tfidf + +import de.itkl.fileprocessing.FileProcessor +import de.itkl.fileprocessing.Resource +import de.itkl.textprocessing.* +import io.github.oshai.kotlinlogging.KotlinLogging +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.reduce +import kotlinx.coroutines.flow.take +import java.io.File +import java.nio.file.Path +import kotlin.io.path.nameWithoutExtension + +private val Log = KotlinLogging.logger { } + +class Idf : FileProcessor { + override fun willProduce(path: Path): Path { + return path.parent.resolve(path.nameWithoutExtension + "-idf.csv") + } + + override suspend fun process(resource: Resource): File { + Log.info { "Would produce: ${willProduce(resource.path)}" } + val resultFile = willProduce(resource.path).toFile() + val textFile = TextFile(resource.toFile()) + val bagOfWords = textFile.splitByEmptyLines { } + .take(10) + .map { document -> + val tokenizer = Tokenizer() + document.map { line -> + val tokens = tokenizer.tokenize(line) + BagOfWords.from(tokens) + } + .reduce { acc, bagOfWords -> acc.join(bagOfWords) } + } + val histogram = Histogram.fromBagOfWords(bagOfWords) + HistogramCsvStorage().save(histogram, resultFile) + return resultFile + } + +} \ No newline at end of file diff --git a/libraries/tfidf/src/main/kotlin/de/itkl/tfidf/Tf.kt b/libraries/tfidf/src/main/kotlin/de/itkl/tfidf/Tf.kt index ab80429..38c4480 100644 --- a/libraries/tfidf/src/main/kotlin/de/itkl/tfidf/Tf.kt +++ b/libraries/tfidf/src/main/kotlin/de/itkl/tfidf/Tf.kt @@ -1,31 +1,9 @@ package de.itkl.tfidf +import com.github.doyaaaaaken.kotlincsv.dsl.csvReader import com.github.doyaaaaaken.kotlincsv.dsl.csvWriter import de.itkl.textprocessing.Histogram import io.github.oshai.kotlinlogging.KotlinLogging import java.io.File -import kotlin.math.max private val Log = KotlinLogging.logger { } -class Tf { - private val data: MutableMap = mutableMapOf() - fun update(histogram: Histogram): Tf { - val max = histogram.maxOf { (_, count) -> count } - .toDouble() - histogram.forEach { (word, count) -> - val tf = count.toDouble() / max - data[word] = tf - } - return this - } - - suspend fun saveToCsv(file: File) { - csvWriter {} - .openAsync(file, append = false) { - writeRow("term", "frequency") - data.forEach { (t, u) -> - writeRow(t, u) - } - } - } -} \ No newline at end of file diff --git a/libraries/tfidf/src/main/kotlin/de/itkl/tfidf/TfIdf.kt b/libraries/tfidf/src/main/kotlin/de/itkl/tfidf/TfIdf.kt index 3212c60..0e10e27 100644 --- a/libraries/tfidf/src/main/kotlin/de/itkl/tfidf/TfIdf.kt +++ b/libraries/tfidf/src/main/kotlin/de/itkl/tfidf/TfIdf.kt @@ -41,13 +41,6 @@ class TfIdf { return histogram } - suspend fun normalizeTf(histogram: Histogram, destination: File) { - Log.info { "Write tf to $destination" } - Tf() - .update(histogram) - .saveToCsv(destination) - } - private fun stemmer(language: Language): SnowballStemmer { return when(language) { Language.DE -> GermanStemmer() diff --git a/libraries/tfidf/src/main/kotlin/de/itkl/tfidf/TfIdfPipeline.kt b/libraries/tfidf/src/main/kotlin/de/itkl/tfidf/TfIdfPipeline.kt new file mode 100644 index 0000000..38eb8be --- /dev/null +++ b/libraries/tfidf/src/main/kotlin/de/itkl/tfidf/TfIdfPipeline.kt @@ -0,0 +1,10 @@ +package de.itkl.tfidf + +import de.itkl.fileprocessing.FileProcessingPipeline +import de.itkl.fileprocessing.FileProcessor + +class TfIdfPipeline(private val language: Language) : FileProcessingPipeline() { + override val fileProcessor = listOf( + Idf() + ) +} \ No newline at end of file diff --git a/settings.gradle.kts b/settings.gradle.kts index 3337ee0..4274775 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -11,4 +11,5 @@ include( "app", "libraries:tfidf", "libraries:textprocessing", + "libraries:fileprocessing", )