code cleanup

develop
Timo Bryant 2023-12-21 17:31:09 +01:00
parent 46f1c49ab1
commit 606837a76f
10 changed files with 34 additions and 132 deletions

View File

@ -22,7 +22,7 @@ class ComputeTf : CliktCommand() {
.required()
override fun run() = runBlocking {
TfIdfPipeline(language = Language.DE)
TfIdfPipeline(language = Language.DE, force = true)
.input(corpus)
// TextFile(corpus).splitByEmptyLines()
// .take(10)

View File

@ -6,7 +6,9 @@ import kotlin.io.path.exists
private val Log = KotlinLogging.logger { }
abstract class FileProcessingPipeline {
abstract class FileProcessingPipeline(private val force: Boolean = false) {
protected abstract val fileProcessor: List<FileProcessor>
protected abstract val progressBarFactory: ProgressBarFactory
@ -14,7 +16,7 @@ abstract class FileProcessingPipeline {
var currentFile = file
fileProcessor.forEach { processor ->
val target = processor.willProduce(currentFile.toPath())
if(target.exists()) {
if(target.exists() && !force) {
Log.info { "$target exists. Skipping" }
} else {
Log.info { "$target does not exists. Creating" }

View File

@ -1,12 +1,13 @@
package de.itkl.processing
import io.github.oshai.kotlinlogging.KotlinLogging
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.flow.*
private val Log = KotlinLogging.logger { }
class ParallelUnorderedFlow<U>(
private val producerJob: Job,
private val mapperFlow: Flow<U>
) : Flow<U> {
override suspend fun collect(collector: FlowCollector<U>) {
@ -22,13 +23,15 @@ suspend fun <T : Any, U : Any> Flow<T>.parallelUnordered(
val producerChannel = Channel<T>()
val producersJob = scope.launch(Dispatchers.Default) {
scope.launch(Dispatchers.Default) {
collect {
producerChannel.send(it)}
producerChannel.send(it)
}
producerChannel.close()
}
val mapperFlow = channelFlow {
(0..numWorkers).map { consumer ->
(0..numWorkers).map {
launch(Dispatchers.Default) {
producerChannel.consumeEach {
send(mapperFn(it))
@ -36,5 +39,5 @@ suspend fun <T : Any, U : Any> Flow<T>.parallelUnordered(
}
}
}
return ParallelUnorderedFlow(producersJob, mapperFlow)
return ParallelUnorderedFlow(mapperFlow)
}

View File

@ -11,37 +11,22 @@ import java.io.InputStream
import java.io.InputStreamReader
class TextFile(val inputStream: InputStream) {
class TextFile(private val inputStream: InputStream) {
fun splitByEmptyLines(): Flow<List<String>> {
val reader = InputStreamReader(inputStream)
var list = mutableListOf<String>()
return flow {
reader.useLines { lines ->
lines.forEach { line ->
if(line.isEmpty()) {
emit(list)
list = mutableListOf()
} else {
list.add(line)
return InputStreamReader(inputStream).use { reader ->
var list = mutableListOf<String>()
flow {
reader.useLines { lines ->
lines.forEach { line ->
if(line.isEmpty()) {
emit(list)
list = mutableListOf()
} else {
list.add(line)
}
}
}
}
}
}
// fun words(progressOp: (read: Long) -> Unit = {}): Flow<String> {
// val factory = AttributeFactory.DEFAULT_ATTRIBUTE_FACTORY
// val tokenizer = StandardTokenizer(factory)
// val reader = ProgressInputStream(file.inputStream(), progressOp)
// tokenizer.setReader(InputStreamReader(reader))
// tokenizer.reset()
// val attr = tokenizer.addAttribute(CharTermAttribute::class.java)
// return flow {
// while (kotlin.runCatching { tokenizer.incrementToken() }.getOrElse { true } ) {
// emit(attr.toString())
// }
// }.onCompletion {
// tokenizer.close()
// }
// }
}

View File

@ -1,28 +1,21 @@
package de.itkl.tfidf
import com.github.ajalt.mordant.terminal.Terminal
import de.itkl.fileprocessing.FileProcessor
import de.itkl.fileprocessing.Resource
import de.itkl.processing.parallelUnordered
import de.itkl.textprocessing.*
import io.github.oshai.kotlinlogging.KotlinLogging
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withPermit
import java.io.File
import java.nio.file.Path
import java.util.concurrent.atomic.AtomicInteger
import kotlin.io.path.nameWithoutExtension
private val Log = KotlinLogging.logger { }
class Idf : FileProcessor {
class DocumentFrequency : FileProcessor {
override fun willProduce(path: Path): Path {
return path.parent.resolve(path.nameWithoutExtension + "-idf.csv")
return path.parent.resolve(path.nameWithoutExtension + "-document-frequency.csv")
}
override suspend fun process(resource: Resource): File = coroutineScope {
@ -35,6 +28,7 @@ class Idf : FileProcessor {
result
}
.reduce { acc, other -> acc.join(other)}
Log.info { "Writing CSV $resultFile" }
HistogramCsvStorage().save(histogram, resultFile)
resultFile
}

View File

@ -0,0 +1,4 @@
package de.itkl.tfidf
class InverseDocumentFrequency {
}

View File

@ -1,9 +0,0 @@
package de.itkl.tfidf
import com.github.doyaaaaaken.kotlincsv.dsl.csvReader
import com.github.doyaaaaaken.kotlincsv.dsl.csvWriter
import de.itkl.textprocessing.Histogram
import io.github.oshai.kotlinlogging.KotlinLogging
import java.io.File
private val Log = KotlinLogging.logger { }

View File

@ -1,55 +0,0 @@
package de.itkl.tfidf
import com.github.ajalt.mordant.terminal.Terminal
import de.itkl.textprocessing.Histogram
import de.itkl.textprocessing.HistogramCsvStorage
import de.itkl.textprocessing.TextFile
import io.github.oshai.kotlinlogging.KotlinLogging
import kotlinx.coroutines.flow.map
import org.tartarus.snowball.SnowballStemmer
import org.tartarus.snowball.ext.GermanStemmer
import java.io.File
import kotlin.io.path.exists
private val Log = KotlinLogging.logger { }
//class TfIdf {
// suspend fun computeTf(
// corpus: File,
// language: Language
// ): Histogram {
// Log.info { "Processing $corpus" }
// val destination = corpus.toPath().parent.resolve("${corpus.nameWithoutExtension}-terms.csv")
//
// if(destination.exists()) {
// return HistogramCsvStorage().read(destination.toFile())
// }
//
// val filesize = corpus.length()
//
// val t = Terminal()
// val histogram = t.progressBar("Indexing ${corpus.name}", filesize) { val stemmer = stemmer(language)
// val words = TextFile(corpus).words {readBytes -> update(readBytes)}
// .map { stemmer.stem(it) }
// Histogram.from(words)
// }
//
// t.progressBar("Saving ${histogram.size} entries", histogram.size.toLong()) {
// HistogramCsvStorage()
// .save(histogram,destination.toFile()) { entriesWritten -> update(entriesWritten)}
// }
// return histogram
// }
//
// private fun stemmer(language: Language): SnowballStemmer {
// return when(language) {
// Language.DE -> GermanStemmer()
// }
// }
//
// private fun SnowballStemmer.stem(word: String): String {
// current = word
// stem()
// return current
// }
//}

View File

@ -1,12 +1,11 @@
package de.itkl.tfidf
import de.itkl.fileprocessing.FileProcessingPipeline
import de.itkl.fileprocessing.FileProcessor
import de.itkl.fileprocessing.ProgressBarFactory
class TfIdfPipeline(private val language: Language) : FileProcessingPipeline() {
class TfIdfPipeline(private val language: Language, force: Boolean) : FileProcessingPipeline(force) {
override val fileProcessor = listOf(
Idf()
DocumentFrequency()
)
override val progressBarFactory: ProgressBarFactory
get() = TerminalProgressBarFactory()

View File

@ -1,21 +0,0 @@
package de.itkl.tfidf
import com.github.ajalt.mordant.animation.ProgressAnimation
import com.github.ajalt.mordant.animation.progressAnimation
import com.github.ajalt.mordant.terminal.Terminal
import java.awt.SystemColor.text
suspend fun <T> Terminal.progressBar(name: String, overall: Long, context: suspend ProgressAnimation.() -> T):T {
val progress = progressAnimation {
text(name)
percentage()
progressBar()
completed()
timeRemaining()
}
progress.start()
progress.updateTotal(overall)
val result = context(progress)
progress.stop()
return result
}