trying to make it parallel
parent
d995b26459
commit
1acc87047f
|
|
@ -1,25 +1,31 @@
|
||||||
package de.itkl.processing
|
package de.itkl.processing
|
||||||
|
|
||||||
import kotlinx.coroutines.async
|
import kotlinx.coroutines.Dispatchers
|
||||||
import kotlinx.coroutines.channels.Channel
|
import kotlinx.coroutines.flow.Flow
|
||||||
import kotlinx.coroutines.channels.consumeEach
|
import kotlinx.coroutines.flow.flow
|
||||||
import kotlinx.coroutines.coroutineScope
|
import kotlinx.coroutines.flow.map
|
||||||
import kotlinx.coroutines.flow.*
|
import kotlinx.coroutines.flow.toList
|
||||||
import kotlinx.coroutines.launch
|
import kotlinx.coroutines.withContext
|
||||||
import kotlinx.coroutines.sync.Semaphore
|
import java.util.concurrent.Executors
|
||||||
import kotlinx.coroutines.sync.withPermit
|
import java.util.concurrent.TimeUnit
|
||||||
import kotlin.coroutines.coroutineContext
|
|
||||||
|
|
||||||
|
|
||||||
|
@Suppress("UNCHECKED_CAST")
|
||||||
class ParallelFlowProcessor<T,U>(
|
class ParallelFlowProcessor<T,U>(
|
||||||
private val mapperFn: (T) -> U,
|
private val mapperFn: (T) -> U) {
|
||||||
private val concurrencyLimit: Int) {
|
companion object {
|
||||||
suspend fun process(flow: Flow<T>): Flow<U> = coroutineScope {
|
private val workers = Executors.newWorkStealingPool(16)
|
||||||
val gate = Semaphore(concurrencyLimit)
|
}
|
||||||
flow.map { item ->
|
|
||||||
async {
|
suspend fun process(flow: Flow<T>): Flow<U> {
|
||||||
gate.withPermit { mapperFn(item) }
|
return flow {
|
||||||
|
flow.map { kotlinx.coroutines.Runnable { mapperFn(it) } }
|
||||||
|
.map { job -> workers.submit(job)}
|
||||||
|
.toList()
|
||||||
|
.forEach { future -> emit(future.get() as U) }
|
||||||
|
withContext(Dispatchers.IO) {
|
||||||
|
workers.awaitTermination(10000, TimeUnit.DAYS)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
.map { it.await() }
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -6,7 +6,7 @@ import kotlinx.coroutines.flow.toSet
|
||||||
class BagOfWords(private val data: MutableSet<String> = mutableSetOf()) : Iterable<String> {
|
class BagOfWords(private val data: MutableSet<String> = mutableSetOf()) : Iterable<String> {
|
||||||
|
|
||||||
companion object {
|
companion object {
|
||||||
suspend fun from(flow: Flow<String>): BagOfWords {
|
fun from(flow: Sequence<String>): BagOfWords {
|
||||||
return BagOfWords(flow.toSet().toMutableSet())
|
return BagOfWords(flow.toSet().toMutableSet())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -9,36 +9,23 @@ import org.apache.lucene.util.AttributeFactory
|
||||||
import java.io.StringReader
|
import java.io.StringReader
|
||||||
|
|
||||||
|
|
||||||
class Tokenizer : AutoCloseable{
|
class Tokenizer {
|
||||||
|
|
||||||
private val tokenizer by lazy {
|
private val tokenizer by lazy {
|
||||||
val factory = AttributeFactory.DEFAULT_ATTRIBUTE_FACTORY
|
val factory = AttributeFactory.DEFAULT_ATTRIBUTE_FACTORY
|
||||||
val tokenizer = StandardTokenizer(factory)
|
val tokenizer = StandardTokenizer(factory)
|
||||||
tokenizer
|
tokenizer
|
||||||
// val reader = ProgressInputStream(file.inputStream(), progressOp)
|
|
||||||
// tokenizer.setReader(InputStreamReader(reader))
|
|
||||||
// tokenizer.reset()
|
|
||||||
// val attr = tokenizer.addAttribute(CharTermAttribute::class.java)
|
|
||||||
// return flow {
|
|
||||||
// while (kotlin.runCatching { tokenizer.incrementToken() }.getOrElse { true } ) {
|
|
||||||
// emit(attr.toString())
|
|
||||||
// }
|
|
||||||
// }.onCompletion {
|
|
||||||
// tokenizer.close()
|
|
||||||
// }
|
|
||||||
}
|
}
|
||||||
fun tokenize(input: String): Flow<String> {
|
fun tokenize(input: String): Sequence<String> {
|
||||||
val reader = StringReader(input)
|
val reader = StringReader(input)
|
||||||
tokenizer.setReader(reader)
|
tokenizer.setReader(reader)
|
||||||
tokenizer.reset()
|
tokenizer.reset()
|
||||||
val attr = tokenizer.addAttribute(CharTermAttribute::class.java)
|
val attr = tokenizer.addAttribute(CharTermAttribute::class.java)
|
||||||
return flow {
|
return sequence {
|
||||||
while (kotlin.runCatching { tokenizer.incrementToken() }.getOrElse { true }) {
|
while (kotlin.runCatching { tokenizer.incrementToken() }.getOrElse { true }) {
|
||||||
emit(attr.toString())
|
yield(attr.toString())
|
||||||
}
|
}
|
||||||
}.onCompletion { tokenizer.close() }
|
tokenizer.close()
|
||||||
}
|
}
|
||||||
override fun close() {
|
|
||||||
tokenizer.close()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -2,6 +2,7 @@ package de.itkl.tfidf
|
||||||
|
|
||||||
import de.itkl.fileprocessing.FileProcessor
|
import de.itkl.fileprocessing.FileProcessor
|
||||||
import de.itkl.fileprocessing.Resource
|
import de.itkl.fileprocessing.Resource
|
||||||
|
import de.itkl.processing.ParallelFlowProcessor
|
||||||
import de.itkl.textprocessing.*
|
import de.itkl.textprocessing.*
|
||||||
import io.github.oshai.kotlinlogging.KotlinLogging
|
import io.github.oshai.kotlinlogging.KotlinLogging
|
||||||
import kotlinx.coroutines.flow.map
|
import kotlinx.coroutines.flow.map
|
||||||
|
|
@ -22,16 +23,20 @@ class Idf : FileProcessor {
|
||||||
Log.info { "Would produce: ${willProduce(resource.path)}" }
|
Log.info { "Would produce: ${willProduce(resource.path)}" }
|
||||||
val resultFile = willProduce(resource.path).toFile()
|
val resultFile = willProduce(resource.path).toFile()
|
||||||
val textFile = TextFile(resource.toFile())
|
val textFile = TextFile(resource.toFile())
|
||||||
val bagOfWords = textFile.splitByEmptyLines { }
|
val documents = textFile.splitByEmptyLines { }
|
||||||
.take(10)
|
val bagOfWords = ParallelFlowProcessor<List<String>, BagOfWords>(
|
||||||
.map { document ->
|
mapperFn = { document ->
|
||||||
val tokenizer = Tokenizer()
|
val tokenizer = Tokenizer()
|
||||||
document.map { line ->
|
val bagOfWords = document.map { line ->
|
||||||
val tokens = tokenizer.tokenize(line)
|
val tokens = tokenizer.tokenize(line)
|
||||||
BagOfWords.from(tokens)
|
BagOfWords.from(tokens)
|
||||||
}
|
}
|
||||||
.reduce { acc, bagOfWords -> acc.join(bagOfWords) }
|
.reduce { acc, bagOfWords -> acc.join(bagOfWords) }
|
||||||
|
bagOfWords
|
||||||
}
|
}
|
||||||
|
).process(documents)
|
||||||
|
|
||||||
|
|
||||||
val histogram = Histogram.fromBagOfWords(bagOfWords)
|
val histogram = Histogram.fromBagOfWords(bagOfWords)
|
||||||
HistogramCsvStorage().save(histogram, resultFile)
|
HistogramCsvStorage().save(histogram, resultFile)
|
||||||
return resultFile
|
return resultFile
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue