diff --git a/app/src/main/java/io/legado/app/data/dao/BookSourceDao.kt b/app/src/main/java/io/legado/app/data/dao/BookSourceDao.kt index f3cd534c5..b089587bf 100644 --- a/app/src/main/java/io/legado/app/data/dao/BookSourceDao.kt +++ b/app/src/main/java/io/legado/app/data/dao/BookSourceDao.kt @@ -190,6 +190,19 @@ interface BookSourceDao { ) fun getEnabledByGroup(group: String): List + @Query( + """select bookSourceUrl, bookSourceName, bookSourceGroup, customOrder, enabled, enabledExplore, + trim(loginUrl) <> '' hasLoginUrl, lastUpdateTime, respondTime, weight, trim(exploreUrl) <> '' hasExploreUrl + from book_sources + where enabled = 1 + and (bookSourceGroup = :group + or bookSourceGroup like :group || ',%' + or bookSourceGroup like '%,' || :group + or bookSourceGroup like '%,' || :group || ',%') + order by customOrder asc""" + ) + fun getEnabledPartByGroup(group: String): List + @Query("select * from book_sources where bookUrlPattern != 'NONE' and bookSourceType = :type order by customOrder asc") fun getEnabledByType(type: Int): List @@ -215,6 +228,13 @@ interface BookSourceDao { @get:Query("select * from book_sources where enabled = 1 order by customOrder") val allEnabled: List + @get:Query( + """select bookSourceUrl, bookSourceName, bookSourceGroup, customOrder, enabled, enabledExplore, + trim(loginUrl) <> '' hasLoginUrl, lastUpdateTime, respondTime, weight, trim(exploreUrl) <> '' hasExploreUrl + from book_sources where enabled = 1 order by customOrder asc""" + ) + val allEnabledPart: List + @get:Query("select * from book_sources where enabled = 0 order by customOrder") val allDisabled: List @@ -242,6 +262,13 @@ interface BookSourceDao { @Query("select * from book_sources where bookSourceUrl = :key") fun getBookSource(key: String): BookSource? + @Query( + """select bookSourceUrl, bookSourceName, bookSourceGroup, customOrder, enabled, enabledExplore, + trim(loginUrl) <> '' hasLoginUrl, lastUpdateTime, respondTime, weight, trim(exploreUrl) <> '' hasExploreUrl + from book_sources where bookSourceUrl = :key""" + ) + fun getBookSourcePart(key: String): BookSourcePart? + @Query("select count(*) from book_sources") fun allCount(): Int diff --git a/app/src/main/java/io/legado/app/model/webBook/SearchModel.kt b/app/src/main/java/io/legado/app/model/webBook/SearchModel.kt index 2cc665390..2628360c4 100644 --- a/app/src/main/java/io/legado/app/model/webBook/SearchModel.kt +++ b/app/src/main/java/io/legado/app/model/webBook/SearchModel.kt @@ -1,23 +1,34 @@ package io.legado.app.model.webBook import io.legado.app.constant.AppConst +import io.legado.app.constant.AppLog import io.legado.app.constant.PreferKey import io.legado.app.data.appDb -import io.legado.app.data.entities.BookSource +import io.legado.app.data.entities.BookSourcePart import io.legado.app.data.entities.SearchBook import io.legado.app.exception.NoStackTraceException import io.legado.app.help.config.AppConfig -import io.legado.app.help.coroutine.CompositeCoroutine import io.legado.app.ui.book.search.SearchScope import io.legado.app.utils.getPrefBoolean +import io.legado.app.utils.mapNotNullParallel import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.CoroutineStart import kotlinx.coroutines.ExecutorCoroutineDispatcher +import kotlinx.coroutines.Job import kotlinx.coroutines.asCoroutineDispatcher +import kotlinx.coroutines.currentCoroutineContext import kotlinx.coroutines.ensureActive -import kotlinx.coroutines.isActive +import kotlinx.coroutines.flow.buffer +import kotlinx.coroutines.flow.catch +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.onCompletion +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.flow.onStart +import kotlinx.coroutines.launch +import kotlinx.coroutines.withTimeout import splitties.init.appCtx import java.util.concurrent.Executors +import kotlin.coroutines.coroutineContext import kotlin.math.min class SearchModel(private val scope: CoroutineScope, private val callBack: CallBack) { @@ -26,12 +37,10 @@ class SearchModel(private val scope: CoroutineScope, private val callBack: CallB private var mSearchId = 0L private var searchPage = 1 private var searchKey: String = "" - private var tasks = CompositeCoroutine() - private var bookSourceList = arrayListOf() + private var bookSourceParts = emptyList() private var searchBooks = arrayListOf() + private var searchJob: Job? = null - @Volatile - private var searchIndex = -1 private fun initSearchPool() { searchPool?.close() @@ -40,97 +49,70 @@ class SearchModel(private val scope: CoroutineScope, private val callBack: CallB } fun search(searchId: Long, key: String) { - callBack.onSearchStart() if (searchId != mSearchId) { if (key.isEmpty()) { - callBack.onSearchCancel() return - } else { - this.searchKey = key } + searchKey = key if (mSearchId != 0L) { close() } - initSearchPool() - mSearchId = searchId - searchPage = 1 - bookSourceList.clear() searchBooks.clear() - callBack.onSearchSuccess(searchBooks) - bookSourceList.addAll(callBack.getSearchScope().getBookSources()) - if (bookSourceList.isEmpty()) { + bookSourceParts = callBack.getSearchScope().getBookSourceParts() + if (bookSourceParts.isEmpty()) { callBack.onSearchCancel(NoStackTraceException("启用书源为空")) return } + mSearchId = searchId + searchPage = 1 + initSearchPool() } else { searchPage++ } - searchIndex = -1 - for (i in 0 until threadCount) { - search(searchId) + startSearch() + } + + private fun startSearch() { + val precision = appCtx.getPrefBoolean(PreferKey.precisionSearch) + searchJob = scope.launch(searchPool!!) { + flow { + for (bs in bookSourceParts) { + bs.getBookSource()?.let { + emit(it) + } + } + }.onStart { + callBack.onSearchStart() + }.mapNotNullParallel(threadCount) { + try { + withTimeout(30000L) { + WebBook.searchBookAwait(it, searchKey, searchPage) + } + } catch (e: Throwable) { + currentCoroutineContext().ensureActive() + null + } + }.buffer(0).onEach { items -> + appDb.searchBookDao.insert(*items.toTypedArray()) + mergeItems(items, precision) + currentCoroutineContext().ensureActive() + callBack.onSearchSuccess(searchBooks) + }.onCompletion { + if (it == null) callBack.onSearchFinish(searchBooks.isEmpty()) + }.catch { + AppLog.put("书源搜索出错\n${it.localizedMessage}", it) + }.collect() } } - @Synchronized - private fun search(searchId: Long) { - if (searchIndex >= bookSourceList.lastIndex) { - return - } - searchIndex++ - val source = bookSourceList.getOrNull(searchIndex) ?: return - val searchPool = searchPool ?: return - val task = WebBook.searchBook( - scope, - source, - searchKey, - searchPage, - context = searchPool, - start = CoroutineStart.LAZY, - executeContext = searchPool - ).timeout(30000L) - .onSuccess { - ensureActive() - onSuccess(searchId, it) - } - .onFinally { - onFinally(searchId) - } - task.start() - tasks.add(task) - } - - @Synchronized - private fun onSuccess(searchId: Long, items: ArrayList) { - if (searchId == mSearchId) { - appDb.searchBookDao.insert(*items.toTypedArray()) - val precision = appCtx.getPrefBoolean(PreferKey.precisionSearch) - mergeItems(scope, items, precision) - callBack.onSearchSuccess(searchBooks) - } - } - - @Synchronized - private fun onFinally(searchId: Long) { - if (searchIndex < bookSourceList.lastIndex) { - search(searchId) - } else { - searchIndex++ - } - if (searchIndex >= bookSourceList.lastIndex - + min(bookSourceList.size, threadCount) - ) { - callBack.onSearchFinish(searchBooks.isEmpty()) - } - } - - private fun mergeItems(scope: CoroutineScope, newDataS: List, precision: Boolean) { + private suspend fun mergeItems(newDataS: List, precision: Boolean) { if (newDataS.isNotEmpty()) { val copyData = ArrayList(searchBooks) val equalData = arrayListOf() val containsData = arrayListOf() val otherData = arrayListOf() copyData.forEach { - if (!scope.isActive) return + coroutineContext.ensureActive() if (it.name == searchKey || it.author == searchKey) { equalData.add(it) } else if (it.name.contains(searchKey) || it.author.contains(searchKey)) { @@ -140,11 +122,11 @@ class SearchModel(private val scope: CoroutineScope, private val callBack: CallB } } newDataS.forEach { nBook -> - if (!scope.isActive) return + coroutineContext.ensureActive() if (nBook.name == searchKey || nBook.author == searchKey) { var hasSame = false equalData.forEach { pBook -> - if (!scope.isActive) return + coroutineContext.ensureActive() if (pBook.name == nBook.name && pBook.author == nBook.author) { pBook.addOrigin(nBook.origin) hasSame = true @@ -156,7 +138,7 @@ class SearchModel(private val scope: CoroutineScope, private val callBack: CallB } else if (nBook.name.contains(searchKey) || nBook.author.contains(searchKey)) { var hasSame = false containsData.forEach { pBook -> - if (!scope.isActive) return + coroutineContext.ensureActive() if (pBook.name == nBook.name && pBook.author == nBook.author) { pBook.addOrigin(nBook.origin) hasSame = true @@ -168,7 +150,7 @@ class SearchModel(private val scope: CoroutineScope, private val callBack: CallB } else if (!precision) { var hasSame = false otherData.forEach { pBook -> - if (!scope.isActive) return + coroutineContext.ensureActive() if (pBook.name == nBook.name && pBook.author == nBook.author) { pBook.addOrigin(nBook.origin) hasSame = true @@ -179,13 +161,13 @@ class SearchModel(private val scope: CoroutineScope, private val callBack: CallB } } } - if (!scope.isActive) return + coroutineContext.ensureActive() equalData.sortByDescending { it.origins.size } equalData.addAll(containsData.sortedByDescending { it.origins.size }) if (!precision) { equalData.addAll(otherData) } - if (!scope.isActive) return + coroutineContext.ensureActive() searchBooks = equalData } } @@ -196,7 +178,7 @@ class SearchModel(private val scope: CoroutineScope, private val callBack: CallB } fun close() { - tasks.clear() + searchJob?.cancel() searchPool?.close() searchPool = null mSearchId = 0L @@ -205,9 +187,9 @@ class SearchModel(private val scope: CoroutineScope, private val callBack: CallB interface CallBack { fun getSearchScope(): SearchScope fun onSearchStart() - fun onSearchSuccess(searchBooks: ArrayList) + fun onSearchSuccess(searchBooks: List) fun onSearchFinish(isEmpty: Boolean) - fun onSearchCancel(exception: Exception? = null) + fun onSearchCancel(exception: Throwable? = null) } } \ No newline at end of file diff --git a/app/src/main/java/io/legado/app/service/CheckSourceService.kt b/app/src/main/java/io/legado/app/service/CheckSourceService.kt index 310e432f7..ac57fab8c 100644 --- a/app/src/main/java/io/legado/app/service/CheckSourceService.kt +++ b/app/src/main/java/io/legado/app/service/CheckSourceService.kt @@ -35,8 +35,10 @@ import kotlinx.coroutines.asCoroutineDispatcher import kotlinx.coroutines.currentCoroutineContext import kotlinx.coroutines.ensureActive import kotlinx.coroutines.flow.buffer +import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.onCompletion +import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.onStart import kotlinx.coroutines.launch import org.mozilla.javascript.WrappedException @@ -112,9 +114,7 @@ class CheckSourceService : BaseService() { upNotification() }.onEachParallel(threadCount) { checkSource(it) - }.onCompletion { - stopSelf() - }.buffer(0).collect { + }.buffer(0).onEach { finishCount++ notificationMsg = getString( R.string.progress_show, @@ -124,7 +124,9 @@ class CheckSourceService : BaseService() { ) upNotification() appDb.bookSourceDao.update(it) - } + }.onCompletion { + stopSelf() + }.collect() } } diff --git a/app/src/main/java/io/legado/app/service/ExportBookService.kt b/app/src/main/java/io/legado/app/service/ExportBookService.kt index 5b117aa16..8ebab110c 100644 --- a/app/src/main/java/io/legado/app/service/ExportBookService.kt +++ b/app/src/main/java/io/legado/app/service/ExportBookService.kt @@ -58,6 +58,7 @@ import kotlinx.coroutines.async import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.ensureActive import kotlinx.coroutines.flow.buffer +import kotlinx.coroutines.flow.collectIndexed import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.onEach @@ -338,8 +339,7 @@ class ExportBookService : BaseService() { }.onEach { it.start() } .buffer(threads) .map { it.await() } - .withIndex() - .collect { (index, result) -> + .collectIndexed { index, result -> postEvent(EventBus.EXPORT_BOOK, book.bookUrl) exportProgress[book.bookUrl] = index append.invoke(result.first, result.second) @@ -685,8 +685,7 @@ class ExportBookService : BaseService() { }.onEach { it.start() } .buffer(threads) .map { it.await() } - .withIndex() - .collect { (index, exportChapter) -> + .collectIndexed { index, exportChapter -> postEvent(EventBus.EXPORT_BOOK, book.bookUrl) exportProgress[book.bookUrl] = index epubBook.resources.addAll(exportChapter.resources) diff --git a/app/src/main/java/io/legado/app/ui/book/search/SearchActivity.kt b/app/src/main/java/io/legado/app/ui/book/search/SearchActivity.kt index 294d040a5..6fe813e35 100644 --- a/app/src/main/java/io/legado/app/ui/book/search/SearchActivity.kt +++ b/app/src/main/java/io/legado/app/ui/book/search/SearchActivity.kt @@ -306,6 +306,7 @@ class SearchActivity : VMBaseActivity { - val list = hashSetOf() + fun getBookSourceParts(): List { + val list = hashSetOf() if (scope.isEmpty()) { - list.addAll(appDb.bookSourceDao.allEnabled) + list.addAll(appDb.bookSourceDao.allEnabledPart) } else { if (scope.contains("::")) { scope.substringAfter("::").let { - appDb.bookSourceDao.getBookSource(it)?.let { source -> + appDb.bookSourceDao.getBookSourcePart(it)?.let { source -> list.add(source) } } } else { val oldScope = scope.splitNotBlank(",") val newScope = oldScope.filter { - val bookSources = appDb.bookSourceDao.getEnabledByGroup(it) + val bookSources = appDb.bookSourceDao.getEnabledPartByGroup(it) list.addAll(bookSources) bookSources.isNotEmpty() } @@ -126,7 +126,7 @@ data class SearchScope(private var scope: String) { } if (list.isEmpty()) { scope = "" - appDb.bookSourceDao.allEnabled.let { + appDb.bookSourceDao.allEnabledPart.let { if (it.isNotEmpty()) { stateLiveData.postValue(scope) list.addAll(it) diff --git a/app/src/main/java/io/legado/app/ui/book/search/SearchViewModel.kt b/app/src/main/java/io/legado/app/ui/book/search/SearchViewModel.kt index befd44c37..76a1c33c9 100644 --- a/app/src/main/java/io/legado/app/ui/book/search/SearchViewModel.kt +++ b/app/src/main/java/io/legado/app/ui/book/search/SearchViewModel.kt @@ -40,7 +40,7 @@ class SearchViewModel(application: Application) : BaseViewModel(application) { isSearchLiveData.postValue(true) } - override fun onSearchSuccess(searchBooks: ArrayList) { + override fun onSearchSuccess(searchBooks: List) { searchBookLiveData.postValue(searchBooks) } @@ -49,7 +49,7 @@ class SearchViewModel(application: Application) : BaseViewModel(application) { searchFinishLiveData.postValue(isEmpty) } - override fun onSearchCancel(exception: Exception?) { + override fun onSearchCancel(exception: Throwable?) { isSearchLiveData.postValue(false) exception?.let { context.toastOnUi(it.localizedMessage) @@ -95,6 +95,7 @@ class SearchViewModel(application: Application) : BaseViewModel(application) { if ((searchKey == key) || key.isNotEmpty()) { searchModel.cancelSearch() searchID = System.currentTimeMillis() + searchBookLiveData.postValue(emptyList()) searchKey = key } if (searchKey.isEmpty()) { diff --git a/app/src/main/java/io/legado/app/utils/FlowExtensions.kt b/app/src/main/java/io/legado/app/utils/FlowExtensions.kt index 5d366b3cf..b30152866 100644 --- a/app/src/main/java/io/legado/app/utils/FlowExtensions.kt +++ b/app/src/main/java/io/legado/app/utils/FlowExtensions.kt @@ -1,15 +1,14 @@ package io.legado.app.utils import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.FlowPreview -import kotlinx.coroutines.flow.DEFAULT_CONCURRENCY import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.filterNotNull import kotlinx.coroutines.flow.flatMapMerge import kotlinx.coroutines.flow.flow -@OptIn(FlowPreview::class, ExperimentalCoroutinesApi::class) +@OptIn(ExperimentalCoroutinesApi::class) inline fun Flow.onEachParallel( - concurrency: Int = DEFAULT_CONCURRENCY, + concurrency: Int, crossinline action: suspend (T) -> Unit ): Flow = flatMapMerge(concurrency) { value -> return@flatMapMerge flow { @@ -17,3 +16,24 @@ inline fun Flow.onEachParallel( emit(value) } } + +@OptIn(ExperimentalCoroutinesApi::class) +inline fun Flow.mapParallel( + concurrency: Int, + crossinline transform: suspend (T) -> R, +): Flow = flatMapMerge(concurrency) { value -> flow { emit(transform(value)) } } + +inline fun Flow.mapNotNullParallel( + concurrency: Int, + crossinline transform: suspend (T) -> R?, +): Flow = mapParallel(concurrency, transform).filterNotNull() + +inline fun Flow.onEachIndexed( + crossinline action: suspend (index: Int, T) -> Unit, +): Flow = flow { + var index = 0 + collect { value -> + action(index++, value) + emit(value) + } +} diff --git a/app/src/main/java/io/legado/app/web/socket/BookSearchWebSocket.kt b/app/src/main/java/io/legado/app/web/socket/BookSearchWebSocket.kt index abb83af7f..b1ed3f907 100644 --- a/app/src/main/java/io/legado/app/web/socket/BookSearchWebSocket.kt +++ b/app/src/main/java/io/legado/app/web/socket/BookSearchWebSocket.kt @@ -7,11 +7,16 @@ import io.legado.app.data.entities.SearchBook import io.legado.app.help.config.AppConfig import io.legado.app.model.webBook.SearchModel import io.legado.app.ui.book.search.SearchScope -import io.legado.app.utils.* -import kotlinx.coroutines.* +import io.legado.app.utils.GSON +import io.legado.app.utils.fromJsonObject +import io.legado.app.utils.isJson +import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers.IO +import kotlinx.coroutines.MainScope +import kotlinx.coroutines.cancel +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch import splitties.init.appCtx - import java.io.IOException class BookSearchWebSocket(handshakeRequest: NanoHTTPD.IHTTPSession) : @@ -81,12 +86,12 @@ class BookSearchWebSocket(handshakeRequest: NanoHTTPD.IHTTPSession) : } - override fun onSearchSuccess(searchBooks: ArrayList) { + override fun onSearchSuccess(searchBooks: List) { send(GSON.toJson(searchBooks)) } override fun onSearchFinish(isEmpty: Boolean) = close(normalClosure, SEARCH_FINISH, false) - override fun onSearchCancel(exception: Exception?) = close(normalClosure, exception?.toString() ?: SEARCH_FINISH, false) + override fun onSearchCancel(exception: Throwable?) = close(normalClosure, exception?.toString() ?: SEARCH_FINISH, false) }