rewriting IDF stuff

develop
Timo Bryant 2023-12-17 17:46:51 +01:00
parent ca51b50306
commit d995b26459
18 changed files with 314 additions and 39 deletions

View File

@ -6,10 +6,12 @@ import com.github.ajalt.clikt.parameters.options.option
import com.github.ajalt.clikt.parameters.options.required import com.github.ajalt.clikt.parameters.options.required
import com.github.ajalt.clikt.parameters.types.enum import com.github.ajalt.clikt.parameters.types.enum
import com.github.ajalt.clikt.parameters.types.file import com.github.ajalt.clikt.parameters.types.file
import de.itkl.textprocessing.TextFile
import de.itkl.tfidf.Language import de.itkl.tfidf.Language
import de.itkl.tfidf.TfIdf import de.itkl.tfidf.TfIdf
import de.itkl.tfidf.TfIdfPipeline
import kotlinx.coroutines.flow.take
import kotlinx.coroutines.runBlocking import kotlinx.coroutines.runBlocking
import java.io.File
class ComputeTf : CliktCommand() { class ComputeTf : CliktCommand() {
private val corpus by option(help = "corpus") private val corpus by option(help = "corpus")
@ -20,13 +22,17 @@ class ComputeTf : CliktCommand() {
.required() .required()
override fun run() = runBlocking { override fun run() = runBlocking {
val tfIdf = TfIdf() TfIdfPipeline(language = Language.DE)
val histogram = tfIdf.computeTf( .input(corpus)
corpus, // TextFile(corpus).splitByEmptyLines()
language // .take(10)
) // .collect { println(it) }
tfIdf.normalizeTf(histogram, corpus.toPath().parent.resolve("${corpus.nameWithoutExtension}-tf.csv").toFile()) // val tfIdf = TfIdf()
// val histogram = tfIdf.computeTf(
// corpus,
// language
// )
// val tf = tfIdf.normalizeTf(histogram, corpus.toPath().parent.resolve("${corpus.nameWithoutExtension}-tf.csv").toFile())
} }
} }

View File

@ -0,0 +1,7 @@
plugins {
id("docthor.kotlin-library-conventions")
}
dependencies {
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.3")
}

View File

@ -0,0 +1,10 @@
package de.itkl.fileprocessing
import java.io.File
import java.io.InputStream
import java.nio.file.Path
interface FileProcessor {
fun willProduce(path: Path): Path
suspend fun process(resource: Resource): File
}

View File

@ -0,0 +1,25 @@
package de.itkl.fileprocessing
import io.github.oshai.kotlinlogging.KotlinLogging
import java.io.File
import kotlin.io.path.exists
private val Log = KotlinLogging.logger { }
abstract class FileProcessingPipeline {
protected abstract val fileProcessor: List<FileProcessor>
suspend fun input(file: File) {
var currentFile = file
fileProcessor.forEach { processor ->
val target = processor.willProduce(currentFile.toPath())
if(target.exists()) {
Log.info { "$target exists. Skipping" }
} else {
Log.info { "$target does not exists. Creating" }
processor.process(FileResource(currentFile))
}
currentFile = target.toFile()
}
}
}

View File

@ -0,0 +1,39 @@
package de.itkl.fileprocessing
import java.io.InputStream
/**
* Represents an input stream that tracks the progress of reading from an underlying input stream.
*
* @property inputStream The underlying input stream to read from.
* @property updateOp The operation to be executed when the number of bytes read changes.
* @property bytesRead The number of bytes read from the input stream.
*/
class ProgressInputStream(
private val inputStream: InputStream,
private val updateOp: (Long) -> Unit) : InputStream() {
@Volatile
var bytesRead: Long = 0
private set(value) {
field = value
updateOp(value)
}
override fun read(): Int {
val byte = inputStream.read()
if (byte != -1) {
bytesRead++
}
return byte
}
override fun read(b: ByteArray, off: Int, len: Int): Int {
val bytesRead = inputStream.read(b, off, len)
if (bytesRead != -1) {
this.bytesRead += bytesRead
}
return bytesRead
}
override fun read(b: ByteArray): Int {
return this.read(b, 0, b.size)
}
}

View File

@ -0,0 +1,34 @@
package de.itkl.fileprocessing
import java.io.File
import java.io.InputStream
import java.nio.file.Files
import java.nio.file.Path
interface Resource {
val path: Path
val size: Long
fun toFile(): File = path.toFile()
fun read(): InputStream
}
class ProgressResource(
private val resource: Resource,
private val progressOpSupplier: () -> (Long) -> Unit
) : Resource by resource
{
override fun read(): InputStream {
return ProgressInputStream(
read(),
progressOpSupplier()
)
}
}
class FileResource(override val path: Path) : Resource {
constructor(file: File): this(file.toPath())
override val size: Long by lazy { path.toFile().length() }
override fun read(): InputStream {
return Files.newInputStream(path)
}
}

View File

@ -0,0 +1,25 @@
package de.itkl.processing
import kotlinx.coroutines.async
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withPermit
import kotlin.coroutines.coroutineContext
class ParallelFlowProcessor<T,U>(
private val mapperFn: (T) -> U,
private val concurrencyLimit: Int) {
suspend fun process(flow: Flow<T>): Flow<U> = coroutineScope {
val gate = Semaphore(concurrencyLimit)
flow.map { item ->
async {
gate.withPermit { mapperFn(item) }
}
}
.map { it.await() }
}
}

View File

@ -0,0 +1,33 @@
package de.itkl.textprocessing
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.toSet
class BagOfWords(private val data: MutableSet<String> = mutableSetOf()) : Iterable<String> {
companion object {
suspend fun from(flow: Flow<String>): BagOfWords {
return BagOfWords(flow.toSet().toMutableSet())
}
}
fun add(word: String) {
data.add(word)
}
fun join(bagOfWords: BagOfWords): BagOfWords {
return BagOfWords(data.toMutableSet().apply { addAll(bagOfWords.data) })
}
override fun iterator(): Iterator<String> {
return iterator {
data.forEach { yield(it) }
}
}
override fun toString(): String {
return data.joinToString(",")
}
}

View File

@ -0,0 +1,4 @@
package de.itkl.textprocessing
class DocumentFrequency {
}

View File

@ -1,6 +1,6 @@
package de.itkl.textprocessing package de.itkl.textprocessing
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.*
class Histogram(private val histo: MutableMap<String,UInt> = mutableMapOf()) : Iterable<Pair<String, UInt>>{ class Histogram(private val histo: MutableMap<String,UInt> = mutableMapOf()) : Iterable<Pair<String, UInt>>{
@ -11,6 +11,14 @@ class Histogram(private val histo: MutableMap<String,UInt> = mutableMapOf()) : I
} }
} }
suspend fun fromBagOfWords(flow: Flow<BagOfWords>): Histogram {
val result = Histogram()
flow.collect { bagOfWords ->
bagOfWords.forEach(result::add)
}
return result
}
fun from(sequence: Sequence<Map<String, String>>): Histogram { fun from(sequence: Sequence<Map<String, String>>): Histogram {
val histo = sequence.associate { map -> map["word"]!! to map["count"]!!.toUInt() } val histo = sequence.associate { map -> map["word"]!! to map["count"]!!.toUInt() }
.toMutableMap() .toMutableMap()

View File

@ -11,6 +11,23 @@ import java.io.InputStreamReader
class TextFile(val file: File) { class TextFile(val file: File) {
fun splitByEmptyLines(progressOp: (read: Long) -> Unit = {}): Flow<List<String>> {
val reader = InputStreamReader(ProgressInputStream(file.inputStream(), progressOp))
var list = mutableListOf<String>()
return flow {
reader.useLines { lines ->
lines.forEach { line ->
if(line.isEmpty()) {
emit(list)
list = mutableListOf()
} else {
list.add(line)
}
}
}
}
}
fun words(progressOp: (read: Long) -> Unit = {}): Flow<String> { fun words(progressOp: (read: Long) -> Unit = {}): Flow<String> {
val factory = AttributeFactory.DEFAULT_ATTRIBUTE_FACTORY val factory = AttributeFactory.DEFAULT_ATTRIBUTE_FACTORY
val tokenizer = StandardTokenizer(factory) val tokenizer = StandardTokenizer(factory)

View File

@ -0,0 +1,44 @@
package de.itkl.textprocessing
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.onCompletion
import org.apache.lucene.analysis.standard.StandardTokenizer
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute
import org.apache.lucene.util.AttributeFactory
import java.io.StringReader
class Tokenizer : AutoCloseable{
private val tokenizer by lazy {
val factory = AttributeFactory.DEFAULT_ATTRIBUTE_FACTORY
val tokenizer = StandardTokenizer(factory)
tokenizer
// val reader = ProgressInputStream(file.inputStream(), progressOp)
// tokenizer.setReader(InputStreamReader(reader))
// tokenizer.reset()
// val attr = tokenizer.addAttribute(CharTermAttribute::class.java)
// return flow {
// while (kotlin.runCatching { tokenizer.incrementToken() }.getOrElse { true } ) {
// emit(attr.toString())
// }
// }.onCompletion {
// tokenizer.close()
// }
}
fun tokenize(input: String): Flow<String> {
val reader = StringReader(input)
tokenizer.setReader(reader)
tokenizer.reset()
val attr = tokenizer.addAttribute(CharTermAttribute::class.java)
return flow {
while (kotlin.runCatching { tokenizer.incrementToken() }.getOrElse { true }) {
emit(attr.toString())
}
}.onCompletion { tokenizer.close() }
}
override fun close() {
tokenizer.close()
}
}

View File

@ -4,6 +4,7 @@ plugins {
dependencies { dependencies {
api(project(":libraries:textprocessing")) api(project(":libraries:textprocessing"))
api(project(":libraries:fileprocessing"))
implementation("com.github.ajalt.mordant:mordant:2.2.0") implementation("com.github.ajalt.mordant:mordant:2.2.0")
implementation("com.github.doyaaaaaken:kotlin-csv-jvm:1.9.2") implementation("com.github.doyaaaaaken:kotlin-csv-jvm:1.9.2")
} }

View File

@ -0,0 +1,40 @@
package de.itkl.tfidf
import de.itkl.fileprocessing.FileProcessor
import de.itkl.fileprocessing.Resource
import de.itkl.textprocessing.*
import io.github.oshai.kotlinlogging.KotlinLogging
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.reduce
import kotlinx.coroutines.flow.take
import java.io.File
import java.nio.file.Path
import kotlin.io.path.nameWithoutExtension
private val Log = KotlinLogging.logger { }
class Idf : FileProcessor {
override fun willProduce(path: Path): Path {
return path.parent.resolve(path.nameWithoutExtension + "-idf.csv")
}
override suspend fun process(resource: Resource): File {
Log.info { "Would produce: ${willProduce(resource.path)}" }
val resultFile = willProduce(resource.path).toFile()
val textFile = TextFile(resource.toFile())
val bagOfWords = textFile.splitByEmptyLines { }
.take(10)
.map { document ->
val tokenizer = Tokenizer()
document.map { line ->
val tokens = tokenizer.tokenize(line)
BagOfWords.from(tokens)
}
.reduce { acc, bagOfWords -> acc.join(bagOfWords) }
}
val histogram = Histogram.fromBagOfWords(bagOfWords)
HistogramCsvStorage().save(histogram, resultFile)
return resultFile
}
}

View File

@ -1,31 +1,9 @@
package de.itkl.tfidf package de.itkl.tfidf
import com.github.doyaaaaaken.kotlincsv.dsl.csvReader
import com.github.doyaaaaaken.kotlincsv.dsl.csvWriter import com.github.doyaaaaaken.kotlincsv.dsl.csvWriter
import de.itkl.textprocessing.Histogram import de.itkl.textprocessing.Histogram
import io.github.oshai.kotlinlogging.KotlinLogging import io.github.oshai.kotlinlogging.KotlinLogging
import java.io.File import java.io.File
import kotlin.math.max
private val Log = KotlinLogging.logger { } private val Log = KotlinLogging.logger { }
class Tf {
private val data: MutableMap<String, Double> = mutableMapOf()
fun update(histogram: Histogram): Tf {
val max = histogram.maxOf { (_, count) -> count }
.toDouble()
histogram.forEach { (word, count) ->
val tf = count.toDouble() / max
data[word] = tf
}
return this
}
suspend fun saveToCsv(file: File) {
csvWriter {}
.openAsync(file, append = false) {
writeRow("term", "frequency")
data.forEach { (t, u) ->
writeRow(t, u)
}
}
}
}

View File

@ -41,13 +41,6 @@ class TfIdf {
return histogram return histogram
} }
suspend fun normalizeTf(histogram: Histogram, destination: File) {
Log.info { "Write tf to $destination" }
Tf()
.update(histogram)
.saveToCsv(destination)
}
private fun stemmer(language: Language): SnowballStemmer { private fun stemmer(language: Language): SnowballStemmer {
return when(language) { return when(language) {
Language.DE -> GermanStemmer() Language.DE -> GermanStemmer()

View File

@ -0,0 +1,10 @@
package de.itkl.tfidf
import de.itkl.fileprocessing.FileProcessingPipeline
import de.itkl.fileprocessing.FileProcessor
class TfIdfPipeline(private val language: Language) : FileProcessingPipeline() {
override val fileProcessor = listOf(
Idf()
)
}

View File

@ -11,4 +11,5 @@ include(
"app", "app",
"libraries:tfidf", "libraries:tfidf",
"libraries:textprocessing", "libraries:textprocessing",
"libraries:fileprocessing",
) )