trying to make a parallel flow

develop
Timo Bryant 2023-12-18 19:20:30 +01:00
parent 1acc87047f
commit 71e066fcde
12 changed files with 143 additions and 70 deletions

View File

@ -8,7 +8,7 @@ import com.github.ajalt.clikt.parameters.types.enum
import com.github.ajalt.clikt.parameters.types.file
import de.itkl.textprocessing.TextFile
import de.itkl.tfidf.Language
import de.itkl.tfidf.TfIdf
//import de.itkl.tfidf.TfIdf
import de.itkl.tfidf.TfIdfPipeline
import kotlinx.coroutines.flow.take
import kotlinx.coroutines.runBlocking

View File

@ -9,6 +9,7 @@ private val Log = KotlinLogging.logger { }
abstract class FileProcessingPipeline {
protected abstract val fileProcessor: List<FileProcessor>
protected abstract val progressBarFactory: ProgressBarFactory
suspend fun input(file: File) {
var currentFile = file
fileProcessor.forEach { processor ->
@ -17,7 +18,9 @@ abstract class FileProcessingPipeline {
Log.info { "$target exists. Skipping" }
} else {
Log.info { "$target does not exists. Creating" }
processor.process(FileResource(currentFile))
val resource = FileResource(currentFile)
val progress = ProgressResource(resource, progressBarFactory)
processor.process(progress)
}
currentFile = target.toFile()
}

View File

@ -0,0 +1,9 @@
package de.itkl.fileprocessing
interface ProgressBarFactory {
fun new(resource: Resource): ProgressBar
}
interface ProgressBar : AutoCloseable {
fun update(bytesRead: Long)
}

View File

@ -11,12 +11,12 @@ import java.io.InputStream
*/
class ProgressInputStream(
private val inputStream: InputStream,
private val updateOp: (Long) -> Unit) : InputStream() {
private val progressBar: ProgressBar) : InputStream() {
@Volatile
var bytesRead: Long = 0
private set(value) {
field = value
updateOp(value)
progressBar.update(value)
}
override fun read(): Int {
@ -36,4 +36,9 @@ class ProgressInputStream(
override fun read(b: ByteArray): Int {
return this.read(b, 0, b.size)
}
override fun close() {
progressBar.close()
super.close()
}
}

View File

@ -4,23 +4,28 @@ import java.io.File
import java.io.InputStream
import java.nio.file.Files
import java.nio.file.Path
import kotlin.io.path.name
interface Resource {
val path: Path
val size: Long
val filename: String
fun toFile(): File = path.toFile()
fun length() = path.toFile().length()
fun read(): InputStream
}
class ProgressResource(
private val resource: Resource,
private val progressOpSupplier: () -> (Long) -> Unit
private val progressBarFactory: ProgressBarFactory
) : Resource by resource
{
override fun read(): InputStream {
return ProgressInputStream(
read(),
progressOpSupplier()
resource.read(),
progressBarFactory.new(this)
)
}
}
@ -28,6 +33,9 @@ class ProgressResource(
class FileResource(override val path: Path) : Resource {
constructor(file: File): this(file.toPath())
override val size: Long by lazy { path.toFile().length() }
override val filename: String
get() = path.name
override fun read(): InputStream {
return Files.newInputStream(path)
}

View File

@ -5,6 +5,7 @@ import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
@ -19,7 +20,10 @@ class ParallelFlowProcessor<T,U>(
suspend fun process(flow: Flow<T>): Flow<U> {
return flow {
flow.map { kotlinx.coroutines.Runnable { mapperFn(it) } }
flow.map { kotlinx.coroutines.Runnable {
val result = mapperFn(it)
runBlocking { emit(result) }
} }
.map { job -> workers.submit(job)}
.toList()
.forEach { future -> emit(future.get() as U) }

View File

@ -13,8 +13,9 @@ class Histogram(private val histo: MutableMap<String,UInt> = mutableMapOf()) : I
suspend fun fromBagOfWords(flow: Flow<BagOfWords>): Histogram {
val result = Histogram()
flow.collect { bagOfWords ->
bagOfWords.forEach(result::add)
flow.collectIndexed { index, value ->
println(index)
value.forEach(result::add)
}
return result
}

View File

@ -7,13 +7,14 @@ import org.apache.lucene.analysis.standard.StandardTokenizer
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute
import org.apache.lucene.util.AttributeFactory
import java.io.File
import java.io.InputStream
import java.io.InputStreamReader
class TextFile(val file: File) {
class TextFile(val inputStream: InputStream) {
fun splitByEmptyLines(progressOp: (read: Long) -> Unit = {}): Flow<List<String>> {
val reader = InputStreamReader(ProgressInputStream(file.inputStream(), progressOp))
fun splitByEmptyLines(): Flow<List<String>> {
val reader = InputStreamReader(inputStream)
var list = mutableListOf<String>()
return flow {
reader.useLines { lines ->
@ -28,19 +29,19 @@ class TextFile(val file: File) {
}
}
}
fun words(progressOp: (read: Long) -> Unit = {}): Flow<String> {
val factory = AttributeFactory.DEFAULT_ATTRIBUTE_FACTORY
val tokenizer = StandardTokenizer(factory)
val reader = ProgressInputStream(file.inputStream(), progressOp)
tokenizer.setReader(InputStreamReader(reader))
tokenizer.reset()
val attr = tokenizer.addAttribute(CharTermAttribute::class.java)
return flow {
while (kotlin.runCatching { tokenizer.incrementToken() }.getOrElse { true } ) {
emit(attr.toString())
}
}.onCompletion {
tokenizer.close()
}
}
// fun words(progressOp: (read: Long) -> Unit = {}): Flow<String> {
// val factory = AttributeFactory.DEFAULT_ATTRIBUTE_FACTORY
// val tokenizer = StandardTokenizer(factory)
// val reader = ProgressInputStream(file.inputStream(), progressOp)
// tokenizer.setReader(InputStreamReader(reader))
// tokenizer.reset()
// val attr = tokenizer.addAttribute(CharTermAttribute::class.java)
// return flow {
// while (kotlin.runCatching { tokenizer.incrementToken() }.getOrElse { true } ) {
// emit(attr.toString())
// }
// }.onCompletion {
// tokenizer.close()
// }
// }
}

View File

@ -1,5 +1,6 @@
package de.itkl.tfidf
import com.github.ajalt.mordant.terminal.Terminal
import de.itkl.fileprocessing.FileProcessor
import de.itkl.fileprocessing.Resource
import de.itkl.processing.ParallelFlowProcessor
@ -22,8 +23,8 @@ class Idf : FileProcessor {
override suspend fun process(resource: Resource): File {
Log.info { "Would produce: ${willProduce(resource.path)}" }
val resultFile = willProduce(resource.path).toFile()
val textFile = TextFile(resource.toFile())
val documents = textFile.splitByEmptyLines { }
val textFile = TextFile(resource.read())
val documents = textFile.splitByEmptyLines()
val bagOfWords = ParallelFlowProcessor<List<String>, BagOfWords>(
mapperFn = { document ->
val tokenizer = Tokenizer()

View File

@ -0,0 +1,38 @@
package de.itkl.tfidf
import com.github.ajalt.mordant.animation.ProgressAnimation
import com.github.ajalt.mordant.animation.progressAnimation
import com.github.ajalt.mordant.terminal.Terminal
import de.itkl.fileprocessing.ProgressBar
import de.itkl.fileprocessing.ProgressBarFactory
import de.itkl.fileprocessing.Resource
class TerminalProgressBarFactory : ProgressBarFactory {
private val terminal = Terminal()
override fun new(resource: Resource): ProgressBar {
val animation = terminal.progressAnimation {
text(resource.filename)
percentage()
progressBar()
completed()
timeRemaining()
}
return TerminalProgressBar(animation, resource.length())
}
}
class TerminalProgressBar(
private val animation: ProgressAnimation, total: Long) : ProgressBar {
init {
animation.start()
animation.updateTotal(total)
}
override fun update(bytesRead: Long) {
animation.update(bytesRead)
}
override fun close() {
animation.stop()
}
}

View File

@ -13,43 +13,43 @@ import kotlin.io.path.exists
private val Log = KotlinLogging.logger { }
class TfIdf {
suspend fun computeTf(
corpus: File,
language: Language
): Histogram {
Log.info { "Processing $corpus" }
val destination = corpus.toPath().parent.resolve("${corpus.nameWithoutExtension}-terms.csv")
if(destination.exists()) {
return HistogramCsvStorage().read(destination.toFile())
}
val filesize = corpus.length()
val t = Terminal()
val histogram = t.progressBar("Indexing ${corpus.name}", filesize) { val stemmer = stemmer(language)
val words = TextFile(corpus).words {readBytes -> update(readBytes)}
.map { stemmer.stem(it) }
Histogram.from(words)
}
t.progressBar("Saving ${histogram.size} entries", histogram.size.toLong()) {
HistogramCsvStorage()
.save(histogram,destination.toFile()) { entriesWritten -> update(entriesWritten)}
}
return histogram
}
private fun stemmer(language: Language): SnowballStemmer {
return when(language) {
Language.DE -> GermanStemmer()
}
}
private fun SnowballStemmer.stem(word: String): String {
current = word
stem()
return current
}
}
//class TfIdf {
// suspend fun computeTf(
// corpus: File,
// language: Language
// ): Histogram {
// Log.info { "Processing $corpus" }
// val destination = corpus.toPath().parent.resolve("${corpus.nameWithoutExtension}-terms.csv")
//
// if(destination.exists()) {
// return HistogramCsvStorage().read(destination.toFile())
// }
//
// val filesize = corpus.length()
//
// val t = Terminal()
// val histogram = t.progressBar("Indexing ${corpus.name}", filesize) { val stemmer = stemmer(language)
// val words = TextFile(corpus).words {readBytes -> update(readBytes)}
// .map { stemmer.stem(it) }
// Histogram.from(words)
// }
//
// t.progressBar("Saving ${histogram.size} entries", histogram.size.toLong()) {
// HistogramCsvStorage()
// .save(histogram,destination.toFile()) { entriesWritten -> update(entriesWritten)}
// }
// return histogram
// }
//
// private fun stemmer(language: Language): SnowballStemmer {
// return when(language) {
// Language.DE -> GermanStemmer()
// }
// }
//
// private fun SnowballStemmer.stem(word: String): String {
// current = word
// stem()
// return current
// }
//}

View File

@ -2,9 +2,12 @@ package de.itkl.tfidf
import de.itkl.fileprocessing.FileProcessingPipeline
import de.itkl.fileprocessing.FileProcessor
import de.itkl.fileprocessing.ProgressBarFactory
class TfIdfPipeline(private val language: Language) : FileProcessingPipeline() {
override val fileProcessor = listOf(
Idf()
)
override val progressBarFactory: ProgressBarFactory
get() = TerminalProgressBarFactory()
}