diff --git a/app/src/main/java/io/legado/app/constant/BookSourceType.kt b/app/src/main/java/io/legado/app/constant/BookSourceType.kt index f961a9f65..fc0d4f1f0 100644 --- a/app/src/main/java/io/legado/app/constant/BookSourceType.kt +++ b/app/src/main/java/io/legado/app/constant/BookSourceType.kt @@ -2,6 +2,7 @@ package io.legado.app.constant import androidx.annotation.IntDef +@Suppress("ConstPropertyName") object BookSourceType { const val default = 0 // 0 文本 diff --git a/app/src/main/java/io/legado/app/data/dao/BookSourceDao.kt b/app/src/main/java/io/legado/app/data/dao/BookSourceDao.kt index b089587bf..685a12cd0 100644 --- a/app/src/main/java/io/legado/app/data/dao/BookSourceDao.kt +++ b/app/src/main/java/io/legado/app/data/dao/BookSourceDao.kt @@ -250,8 +250,12 @@ interface BookSourceDao { @get:Query("select * from book_sources where loginUrl is not null and loginUrl != ''") val allLogin: List - @get:Query("select * from book_sources where enabled = 1 and bookSourceType = 0 order by customOrder") - val allTextEnabled: List + @get:Query( + """select bookSourceUrl, bookSourceName, bookSourceGroup, customOrder, enabled, enabledExplore, + trim(loginUrl) <> '' hasLoginUrl, lastUpdateTime, respondTime, weight, trim(exploreUrl) <> '' hasExploreUrl + from book_sources where enabled = 1 and bookSourceType = 0 order by customOrder""" + ) + val allTextEnabledPart: List @get:Query("select distinct bookSourceGroup from book_sources where trim(bookSourceGroup) <> ''") val allGroupsUnProcessed: List diff --git a/app/src/main/java/io/legado/app/help/book/BookHelp.kt b/app/src/main/java/io/legado/app/help/book/BookHelp.kt index acbf32566..99fbdafcd 100644 --- a/app/src/main/java/io/legado/app/help/book/BookHelp.kt +++ b/app/src/main/java/io/legado/app/help/book/BookHelp.kt @@ -479,6 +479,15 @@ object BookHelp { } } + fun getDurChapter( + oldBook: Book, + newChapterList: List + ): Int { + return oldBook.run { + getDurChapter(durChapterIndex, durChapterTitle, newChapterList, totalChapterNum) + } + } + private val chapterNamePattern1 by lazy { Pattern.compile(".*?第([\\d零〇一二两三四五六七八九十百千万壹贰叁肆伍陆柒捌玖拾佰仟]+)[章节篇回集话]") } diff --git a/app/src/main/java/io/legado/app/model/webBook/SearchModel.kt b/app/src/main/java/io/legado/app/model/webBook/SearchModel.kt index 2628360c4..50b811063 100644 --- a/app/src/main/java/io/legado/app/model/webBook/SearchModel.kt +++ b/app/src/main/java/io/legado/app/model/webBook/SearchModel.kt @@ -10,14 +10,13 @@ import io.legado.app.exception.NoStackTraceException import io.legado.app.help.config.AppConfig import io.legado.app.ui.book.search.SearchScope import io.legado.app.utils.getPrefBoolean -import io.legado.app.utils.mapNotNullParallel +import io.legado.app.utils.mapParallelSafe import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExecutorCoroutineDispatcher import kotlinx.coroutines.Job import kotlinx.coroutines.asCoroutineDispatcher import kotlinx.coroutines.currentCoroutineContext import kotlinx.coroutines.ensureActive -import kotlinx.coroutines.flow.buffer import kotlinx.coroutines.flow.catch import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.flow @@ -83,16 +82,11 @@ class SearchModel(private val scope: CoroutineScope, private val callBack: CallB } }.onStart { callBack.onSearchStart() - }.mapNotNullParallel(threadCount) { - try { - withTimeout(30000L) { - WebBook.searchBookAwait(it, searchKey, searchPage) - } - } catch (e: Throwable) { - currentCoroutineContext().ensureActive() - null + }.mapParallelSafe(threadCount) { + withTimeout(30000L) { + WebBook.searchBookAwait(it, searchKey, searchPage) } - }.buffer(0).onEach { items -> + }.onEach { items -> appDb.searchBookDao.insert(*items.toTypedArray()) mergeItems(items, precision) currentCoroutineContext().ensureActive() diff --git a/app/src/main/java/io/legado/app/service/CheckSourceService.kt b/app/src/main/java/io/legado/app/service/CheckSourceService.kt index ac57fab8c..37ec9f0e9 100644 --- a/app/src/main/java/io/legado/app/service/CheckSourceService.kt +++ b/app/src/main/java/io/legado/app/service/CheckSourceService.kt @@ -114,7 +114,7 @@ class CheckSourceService : BaseService() { upNotification() }.onEachParallel(threadCount) { checkSource(it) - }.buffer(0).onEach { + }.onEach { finishCount++ notificationMsg = getString( R.string.progress_show, diff --git a/app/src/main/java/io/legado/app/ui/book/read/ReadBookViewModel.kt b/app/src/main/java/io/legado/app/ui/book/read/ReadBookViewModel.kt index dfe18ed24..255e99151 100644 --- a/app/src/main/java/io/legado/app/ui/book/read/ReadBookViewModel.kt +++ b/app/src/main/java/io/legado/app/ui/book/read/ReadBookViewModel.kt @@ -34,10 +34,19 @@ import io.legado.app.ui.book.searchContent.SearchResult import io.legado.app.utils.DocumentUtils import io.legado.app.utils.FileUtils import io.legado.app.utils.isContentScheme +import io.legado.app.utils.mapParallelSafe import io.legado.app.utils.postEvent import io.legado.app.utils.toStringArray import io.legado.app.utils.toastOnUi import kotlinx.coroutines.Dispatchers.IO +import kotlinx.coroutines.flow.catch +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.onCompletion +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.flow.onEmpty +import kotlinx.coroutines.flow.onStart +import kotlinx.coroutines.flow.take import java.io.File import java.io.FileInputStream import java.io.FileNotFoundException @@ -266,39 +275,44 @@ class ReadBookViewModel(application: Application) : BaseViewModel(application) { private fun autoChangeSource(name: String, author: String) { if (!AppConfig.autoChangeSource) return execute { - val sources = appDb.bookSourceDao.allTextEnabled - sources.forEach { source -> - WebBook.preciseSearchAwait(this, source, name, author).getOrNull()?.let { book -> - if (book.tocUrl.isEmpty()) { - WebBook.getBookInfoAwait(source, book) - } - val toc = WebBook.getChapterListAwait(source, book).getOrThrow() - val chapter = toc.getOrElse(book.durChapterIndex) { - toc.last() - } - val nextChapter = toc.getOrElse(chapter.index) { - toc.first() - } - kotlin.runCatching { - WebBook.getContentAwait( - bookSource = source, - book = book, - bookChapter = chapter, - nextChapterUrl = nextChapter.url - ) - changeTo(book, toc) - return@execute + val sources = appDb.bookSourceDao.allTextEnabledPart + flow { + for (source in sources) { + source.getBookSource()?.let { + emit(it) } } - } - throw NoStackTraceException("没有合适书源") - }.onStart { - ReadBook.upMsg(context.getString(R.string.source_auto_changing)) - }.onError { - AppLog.put("自动换源失败\n${it.localizedMessage}", it) - context.toastOnUi("自动换源失败\n${it.localizedMessage}") - }.onFinally { - ReadBook.upMsg(null) + }.onStart { + ReadBook.upMsg(context.getString(R.string.source_auto_changing)) + }.mapParallelSafe(AppConfig.threadCount) { source -> + val book = WebBook.preciseSearchAwait(this, source, name, author).getOrThrow() + if (book.tocUrl.isEmpty()) { + WebBook.getBookInfoAwait(source, book) + } + val toc = WebBook.getChapterListAwait(source, book).getOrThrow() + val chapter = toc.getOrElse(book.durChapterIndex) { + toc.last() + } + val nextChapter = toc.getOrElse(chapter.index) { + toc.first() + } + WebBook.getContentAwait( + bookSource = source, + book = book, + bookChapter = chapter, + nextChapterUrl = nextChapter.url + ) + book to toc + }.take(1).onEach { (book, toc) -> + changeTo(book, toc) + }.onEmpty { + throw NoStackTraceException("没有合适书源") + }.onCompletion { + ReadBook.upMsg(null) + }.catch { + AppLog.put("自动换源失败\n${it.localizedMessage}", it) + context.toastOnUi("自动换源失败\n${it.localizedMessage}") + }.collect() } } diff --git a/app/src/main/java/io/legado/app/utils/FlowExtensions.kt b/app/src/main/java/io/legado/app/utils/FlowExtensions.kt index 5a173252e..66e4f6728 100644 --- a/app/src/main/java/io/legado/app/utils/FlowExtensions.kt +++ b/app/src/main/java/io/legado/app/utils/FlowExtensions.kt @@ -2,7 +2,11 @@ package io.legado.app.utils import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.async +import kotlinx.coroutines.currentCoroutineContext +import kotlinx.coroutines.ensureActive import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.FlowCollector +import kotlinx.coroutines.flow.buffer import kotlinx.coroutines.flow.channelFlow import kotlinx.coroutines.flow.filterNotNull import kotlinx.coroutines.flow.flatMapMerge @@ -20,13 +24,57 @@ inline fun Flow.onEachParallel( action(value) emit(value) } -} +}.buffer(0) + +@OptIn(ExperimentalCoroutinesApi::class) +inline fun Flow.onEachParallelSafe( + concurrency: Int, + crossinline action: suspend (T) -> Unit +): Flow = flatMapMerge(concurrency) { value -> + flow { + try { + action(value) + } catch (e: Throwable) { + currentCoroutineContext().ensureActive() + } + emit(value) + } +}.buffer(0) @OptIn(ExperimentalCoroutinesApi::class) inline fun Flow.mapParallel( concurrency: Int, crossinline transform: suspend (T) -> R, -): Flow = flatMapMerge(concurrency) { value -> flow { emit(transform(value)) } } +): Flow = flatMapMerge(concurrency) { value -> flow { emit(transform(value)) } }.buffer(0) + + +@OptIn(ExperimentalCoroutinesApi::class) +inline fun Flow.mapParallelSafe( + concurrency: Int, + crossinline transform: suspend (T) -> R, +): Flow = flatMapMerge(concurrency) { value -> + flow { + try { + emit(transform(value)) + } catch (e: Throwable) { + currentCoroutineContext().ensureActive() + } + } +}.buffer(0) + +@OptIn(ExperimentalCoroutinesApi::class) +inline fun Flow.transformParallelSafe( + concurrency: Int, + crossinline transform: suspend FlowCollector.(T) -> R, +): Flow = flatMapMerge(concurrency) { value -> + flow { + try { + transform(value) + } catch (e: Throwable) { + currentCoroutineContext().ensureActive() + } + } +}.buffer(0) inline fun Flow.mapNotNullParallel( concurrency: Int, @@ -67,7 +115,7 @@ inline fun Flow.mapAsync( }.map { it.await() }.onEach { semaphore.release() } - } + }.buffer(0) } inline fun Flow.mapAsyncIndexed( @@ -89,7 +137,7 @@ inline fun Flow.mapAsyncIndexed( }.map { it.await() }.onEach { semaphore.release() } - } + }.buffer(0) } inline fun Flow.onEachAsync( @@ -110,7 +158,7 @@ inline fun Flow.onEachAsync( }.map { it.await() }.onEach { semaphore.release() } - } + }.buffer(0) } inline fun Flow.onEachAsyncIndexed( @@ -135,5 +183,5 @@ inline fun Flow.onEachAsyncIndexed( }.map { it.await() }.onEach { semaphore.release() } - } + }.buffer(0) }