From ee58096a482e5a18d3f65e470d51cdd654daf691 Mon Sep 17 00:00:00 2001 From: Brent Simmons Date: Sun, 10 Mar 2024 16:35:55 -0700 Subject: [PATCH] Make SyncStatusTable an actor instead of using a serial dispatchQueue. --- .../Sources/SyncDatabase/SyncDatabase.swift | 206 +++++++--- .../Sources/SyncDatabase/SyncStatus.swift | 8 +- .../SyncDatabase/SyncStatusTable.swift | 356 +++++++++--------- 3 files changed, 348 insertions(+), 222 deletions(-) diff --git a/SyncDatabase/Sources/SyncDatabase/SyncDatabase.swift b/SyncDatabase/Sources/SyncDatabase/SyncDatabase.swift index 93726a113..79feb5507 100644 --- a/SyncDatabase/Sources/SyncDatabase/SyncDatabase.swift +++ b/SyncDatabase/Sources/SyncDatabase/SyncDatabase.swift @@ -8,60 +8,185 @@ import Foundation import RSCore -import RSDatabase +import Database -public typealias SyncStatusesResult = Result, DatabaseError> -public typealias SyncStatusesCompletionBlock = (SyncStatusesResult) -> Void - -public typealias SyncStatusArticleIDsResult = Result, DatabaseError> -public typealias SyncStatusArticleIDsCompletionBlock = (SyncStatusArticleIDsResult) -> Void - -public struct SyncDatabase { +public struct SyncDatabase: Sendable { private let syncStatusTable: SyncStatusTable - private let queue: DatabaseQueue public init(databaseFilePath: String) { - let queue = DatabaseQueue(databasePath: databaseFilePath) - try! queue.runCreateStatements(SyncDatabase.tableCreationStatements) - queue.vacuumIfNeeded(daysBetweenVacuums: 11) - self.queue = queue - self.syncStatusTable = SyncStatusTable(queue: queue) + self.syncStatusTable = SyncStatusTable(databasePath: databaseFilePath) } // MARK: - API - public func insertStatuses(_ statuses: [SyncStatus], completion: @escaping DatabaseCompletionBlock) { - syncStatusTable.insertStatuses(statuses, completion: completion) + public func insertStatuses(_ statuses: [SyncStatus]) async throws { + try await syncStatusTable.insertStatuses(statuses) } + public func selectForProcessing(limit: Int? = nil) async throws -> Set? { + try await syncStatusTable.selectForProcessing(limit: limit) + } + + public func selectPendingCount() async throws -> Int? { + try await syncStatusTable.selectPendingCount() + } + + public func selectPendingReadStatusArticleIDs() async throws -> Set? { + try await syncStatusTable.selectPendingReadStatusArticleIDs() + } + + public func selectPendingStarredStatusArticleIDs() async throws -> Set? { + try await syncStatusTable.selectPendingStarredStatusArticleIDs() + } + + public func resetAllSelectedForProcessing() async throws { + try await syncStatusTable.resetAllSelectedForProcessing() + } + + public func resetSelectedForProcessing(_ articleIDs: [String]) async throws { + try await syncStatusTable.resetSelectedForProcessing(articleIDs) + } + + public func deleteSelectedForProcessing(_ articleIDs: [String]) async throws { + try await syncStatusTable.deleteSelectedForProcessing(articleIDs) + } + + // MARK: - Suspend and Resume (for iOS) + + /// Close the database and stop running database calls. + /// + /// On Macs, suspend() and resume() do nothing. They’re not needed. + public func suspend() async { + await syncStatusTable.suspend() + } + + /// Open the database and allow for running database calls again. + public func resume() async { + await syncStatusTable.resume() + } +} + +// MARK: - Compatibility + +// Use the below until switching to the async version of the API. + +public typealias SyncStatusesResult = Result, DatabaseError> +public typealias SyncStatusesCompletionBlock = @Sendable (SyncStatusesResult) -> Void + +public typealias SyncStatusArticleIDsResult = Result, DatabaseError> +public typealias SyncStatusArticleIDsCompletionBlock = @Sendable (SyncStatusArticleIDsResult) -> Void + +extension SyncDatabase { + + public func insertStatuses(_ statuses: [SyncStatus], completion: @escaping DatabaseCompletionBlock) { + + Task { + do { + try await self.insertStatuses(statuses) + completion(nil) + } catch { + completion(DatabaseError.isSuspended) + } + } + } + public func selectForProcessing(limit: Int? = nil, completion: @escaping SyncStatusesCompletionBlock) { - return syncStatusTable.selectForProcessing(limit: limit, completion: completion) + + Task { + do { + if let syncStatuses = try await self.selectForProcessing(limit: limit) { + completion(.success(Array(syncStatuses))) + } else { + completion(.success([SyncStatus]())) + } + } catch { + completion(.failure(DatabaseError.isSuspended)) + } + } } public func selectPendingCount(completion: @escaping DatabaseIntCompletionBlock) { - syncStatusTable.selectPendingCount(completion) + + Task { + do { + if let count = try await self.selectPendingCount() { + completion(.success(count)) + } else { + completion(.success(0)) + } + + } catch { + completion(.failure(DatabaseError.isSuspended)) + } + } + } + + public func selectPendingReadStatusArticleIDs(completion: @escaping SyncStatusArticleIDsCompletionBlock) { + + Task { + do { + if let articleIDs = try await self.selectPendingReadStatusArticleIDs() { + completion(.success(articleIDs)) + } else { + completion(.success(Set())) + } + } catch { + completion(.failure(DatabaseError.isSuspended)) + } + } + } + + public func selectPendingStarredStatusArticleIDs(completion: @escaping SyncStatusArticleIDsCompletionBlock) { + + Task { + do { + if let articleIDs = try await self.selectPendingStarredStatusArticleIDs() { + completion(.success(articleIDs)) + } else { + completion(.success(Set())) + } + } catch { + completion(.failure(DatabaseError.isSuspended)) + } + } } - public func selectPendingReadStatusArticleIDs(completion: @escaping SyncStatusArticleIDsCompletionBlock) { - syncStatusTable.selectPendingReadStatusArticleIDs(completion: completion) - } - - public func selectPendingStarredStatusArticleIDs(completion: @escaping SyncStatusArticleIDsCompletionBlock) { - syncStatusTable.selectPendingStarredStatusArticleIDs(completion: completion) - } - public func resetAllSelectedForProcessing(completion: DatabaseCompletionBlock? = nil) { - syncStatusTable.resetAllSelectedForProcessing(completion: completion) + + Task { + do { + try await self.resetAllSelectedForProcessing() + completion?(nil) + } catch { + completion?(DatabaseError.isSuspended) + } + } } public func resetSelectedForProcessing(_ articleIDs: [String], completion: DatabaseCompletionBlock? = nil) { - syncStatusTable.resetSelectedForProcessing(articleIDs, completion: completion) + + Task { + do { + try await self.resetSelectedForProcessing(articleIDs) + completion?(nil) + } catch { + completion?(DatabaseError.isSuspended) + } + } } - - public func deleteSelectedForProcessing(_ articleIDs: [String], completion: DatabaseCompletionBlock? = nil) { - syncStatusTable.deleteSelectedForProcessing(articleIDs, completion: completion) + + public func deleteSelectedForProcessing(_ articleIDs: [String], completion: DatabaseCompletionBlock? = nil) { + + Task { + do { + try await self.deleteSelectedForProcessing(articleIDs) + completion?(nil) + } catch { + completion?(DatabaseError.isSuspended) + } + } } // MARK: - Suspend and Resume (for iOS) @@ -69,20 +194,17 @@ public struct SyncDatabase { /// Close the database and stop running database calls. /// Any pending calls will complete first. public func suspend() { - queue.suspend() + + Task { + await self.suspend() + } } /// Open the database and allow for running database calls again. public func resume() { - queue.resume() + + Task { + await self.resume() + } } } - -// MARK: - Private - -private extension SyncDatabase { - - static let tableCreationStatements = """ - CREATE TABLE if not EXISTS syncStatus (articleID TEXT NOT NULL, key TEXT NOT NULL, flag BOOL NOT NULL DEFAULT 0, selected BOOL NOT NULL DEFAULT 0, PRIMARY KEY (articleID, key)); - """ -} diff --git a/SyncDatabase/Sources/SyncDatabase/SyncStatus.swift b/SyncDatabase/Sources/SyncDatabase/SyncStatus.swift index cb6c71a45..1b0d66d39 100644 --- a/SyncDatabase/Sources/SyncDatabase/SyncStatus.swift +++ b/SyncDatabase/Sources/SyncDatabase/SyncStatus.swift @@ -8,11 +8,11 @@ import Foundation import Articles -import RSDatabase +import Database -public struct SyncStatus: Hashable, Equatable { - - public enum Key: String { +public struct SyncStatus: Hashable, Equatable, Sendable { + + public enum Key: String, Sendable { case read = "read" case starred = "starred" case deleted = "deleted" diff --git a/SyncDatabase/Sources/SyncDatabase/SyncStatusTable.swift b/SyncDatabase/Sources/SyncDatabase/SyncStatusTable.swift index 0ab0d80cb..ff86f49b5 100644 --- a/SyncDatabase/Sources/SyncDatabase/SyncStatusTable.swift +++ b/SyncDatabase/Sources/SyncDatabase/SyncStatusTable.swift @@ -9,183 +9,213 @@ import Foundation import RSCore import Articles -import RSDatabase -import RSDatabaseObjC +import Database +import FMDB -struct SyncStatusTable: DatabaseTable { +extension FMDatabase { - let name = DatabaseTableName.syncStatus - private let queue: DatabaseQueue + static func openAndSetUpDatabase(path: String) -> FMDatabase { - init(queue: DatabaseQueue) { - self.queue = queue + let database = FMDatabase(path: path)! + + database.open() + database.executeStatements("PRAGMA synchronous = 1;") + database.setShouldCacheStatements(true) + + return database } - func selectForProcessing(limit: Int?, completion: @escaping SyncStatusesCompletionBlock) { - queue.runInTransaction { databaseResult in - var statuses = Set() - var error: DatabaseError? + func executeUpdateInTransaction(_ sql : String, withArgumentsIn parameters: [Any]?) { - func makeDatabaseCall(_ database: FMDatabase) { - let updateSQL = "update syncStatus set selected = true" - database.executeUpdate(updateSQL, withArgumentsIn: nil) - - var selectSQL = "select * from syncStatus where selected == true" - if let limit = limit { - selectSQL = "\(selectSQL) limit \(limit)" - } - if let resultSet = database.executeQuery(selectSQL, withArgumentsIn: nil) { - statuses = resultSet.mapToSet(self.statusWithRow) - } - } - - switch databaseResult { - case .success(let database): - makeDatabaseCall(database) - case .failure(let databaseError): - error = databaseError - } - - DispatchQueue.main.async { - if let error = error { - completion(.failure(error)) - } - else { - completion(.success(Array(statuses))) - } - } - } + beginTransaction() + executeUpdate(sql, withArgumentsIn: parameters) + commit() } - - func selectPendingCount(_ completion: @escaping DatabaseIntCompletionBlock) { - queue.runInDatabase { databaseResult in - var count: Int = 0 - var error: DatabaseError? - func makeDatabaseCall(_ database: FMDatabase) { - let sql = "select count(*) from syncStatus" - if let resultSet = database.executeQuery(sql, withArgumentsIn: nil) { - count = self.numberWithCountResultSet(resultSet) - } - } + func vacuum() { - switch databaseResult { - case .success(let database): - makeDatabaseCall(database) - case .failure(let databaseError): - error = databaseError - } + executeStatements("vacuum;") + } - DispatchQueue.main.async { - if let error = error { - completion(.failure(error)) - } - else { - completion(.success(count)) - } + func runCreateStatements(_ statements: String) { + + statements.enumerateLines { (line, stop) in + if line.lowercased().hasPrefix("create") { + self.executeStatements(line) } + stop = false } } - func selectPendingReadStatusArticleIDs(completion: @escaping SyncStatusArticleIDsCompletionBlock) { - selectPendingArticleIDsAsync(.read, completion) - } - - func selectPendingStarredStatusArticleIDs(completion: @escaping SyncStatusArticleIDsCompletionBlock) { - selectPendingArticleIDsAsync(.starred, completion) - } - - func resetAllSelectedForProcessing(completion: DatabaseCompletionBlock? = nil) { - queue.runInTransaction { databaseResult in + func insertRows(_ dictionaries: [DatabaseDictionary], insertType: RSDatabaseInsertType, tableName: String) { - func makeDatabaseCall(_ database: FMDatabase) { - let updateSQL = "update syncStatus set selected = false" - database.executeUpdate(updateSQL, withArgumentsIn: nil) - } - - switch databaseResult { - case .success(let database): - makeDatabaseCall(database) - callCompletion(completion, nil) - case .failure(let databaseError): - callCompletion(completion, databaseError) - } + for dictionary in dictionaries { + _ = rs_insertRow(with: dictionary, insertType: insertType, tableName: tableName) } } +} + +extension FMResultSet { + + func intWithCountResult() -> Int? { + + guard next() else { + return nil + } + + return Int(long(forColumnIndex: 0)) + } +} + +actor SyncStatusTable { + + static private let tableName = "syncStatus" + + private var database: FMDatabase? + private let databasePath: String + + init(databasePath: String) { + + let database = FMDatabase.openAndSetUpDatabase(path: databasePath) + database.runCreateStatements(SyncStatusTable.creationStatements) + database.vacuum() + + self.database = database + self.databasePath = databasePath + } + + func suspend() { +#if os(iOS) + database?.close() + database = nil +#endif + } + + func resume() { +#if os(iOS) + if database == nil { + self.database = FMDatabase.openAndSetUpDatabase(path: databasePath) + } +#endif + } + + func close() { + + database?.close() + } + + func selectForProcessing(limit: Int?) throws -> Set? { + + guard let database else { + throw DatabaseError.isSuspended + } + + let updateSQL = "update syncStatus set selected = true" + database.executeUpdateInTransaction(updateSQL, withArgumentsIn: nil) + + let selectSQL = { + var sql = "select * from syncStatus where selected == true" + if let limit { + sql = "\(sql) limit \(limit)" + } + return sql + }() + + guard let resultSet = database.executeQuery(selectSQL, withArgumentsIn: nil) else { + return nil + } + let statuses = resultSet.mapToSet(self.statusWithRow) + return statuses + } + + func selectPendingCount() throws -> Int? { + + guard let database else { + throw DatabaseError.isSuspended + } + + let sql = "select count(*) from syncStatus" + guard let resultSet = database.executeQuery(sql, withArgumentsIn: nil) else { + return nil + } + + let count = resultSet.intWithCountResult() + return count + } + + func selectPendingReadStatusArticleIDs() throws -> Set? { + try selectPendingArticleIDs(.read) + } + + func selectPendingStarredStatusArticleIDs() throws -> Set? { + try selectPendingArticleIDs(.starred) + } + + func resetAllSelectedForProcessing() throws { + + guard let database else { + throw DatabaseError.isSuspended + } + + let updateSQL = "update syncStatus set selected = false" + database.executeUpdateInTransaction(updateSQL, withArgumentsIn: nil) + } + + func resetSelectedForProcessing(_ articleIDs: [String]) throws { - func resetSelectedForProcessing(_ articleIDs: [String], completion: DatabaseCompletionBlock? = nil) { guard !articleIDs.isEmpty else { - callCompletion(completion, nil) return } - - queue.runInTransaction { databaseResult in - - func makeDatabaseCall(_ database: FMDatabase) { - let parameters = articleIDs.map { $0 as AnyObject } - let placeholders = NSString.rs_SQLValueList(withPlaceholders: UInt(articleIDs.count))! - let updateSQL = "update syncStatus set selected = false where articleID in \(placeholders)" - database.executeUpdate(updateSQL, withArgumentsIn: parameters) - } - - switch databaseResult { - case .success(let database): - makeDatabaseCall(database) - callCompletion(completion, nil) - case .failure(let databaseError): - callCompletion(completion, databaseError) - } + guard let database else { + throw DatabaseError.isSuspended } + + let parameters = articleIDs.map { $0 as AnyObject } + let placeholders = NSString.rs_SQLValueList(withPlaceholders: UInt(articleIDs.count))! + let updateSQL = "update syncStatus set selected = false where articleID in \(placeholders)" + + database.executeUpdateInTransaction(updateSQL, withArgumentsIn: parameters) } - - func deleteSelectedForProcessing(_ articleIDs: [String], completion: DatabaseCompletionBlock? = nil) { + + func deleteSelectedForProcessing(_ articleIDs: [String]) throws { + guard !articleIDs.isEmpty else { - callCompletion(completion, nil) return } - - queue.runInTransaction { databaseResult in - - func makeDatabaseCall(_ database: FMDatabase) { - let parameters = articleIDs.map { $0 as AnyObject } - let placeholders = NSString.rs_SQLValueList(withPlaceholders: UInt(articleIDs.count))! - let deleteSQL = "delete from syncStatus where selected = true and articleID in \(placeholders)" - database.executeUpdate(deleteSQL, withArgumentsIn: parameters) - } - - switch databaseResult { - case .success(let database): - makeDatabaseCall(database) - callCompletion(completion, nil) - case .failure(let databaseError): - callCompletion(completion, databaseError) - } + guard let database else { + throw DatabaseError.isSuspended } + + let parameters = articleIDs.map { $0 as AnyObject } + let placeholders = NSString.rs_SQLValueList(withPlaceholders: UInt(articleIDs.count))! + let deleteSQL = "delete from syncStatus where selected = true and articleID in \(placeholders)" + + database.executeUpdateInTransaction(deleteSQL, withArgumentsIn: parameters) } - - func insertStatuses(_ statuses: [SyncStatus], completion: @escaping DatabaseCompletionBlock) { - queue.runInTransaction { databaseResult in - func makeDatabaseCall(_ database: FMDatabase) { - let statusArray = statuses.map { $0.databaseDictionary() } - self.insertRows(statusArray, insertType: .orReplace, in: database) - } + func insertStatuses(_ statuses: [SyncStatus]) throws { - switch databaseResult { - case .success(let database): - makeDatabaseCall(database) - callCompletion(completion, nil) - case .failure(let databaseError): - callCompletion(completion, databaseError) - } + guard let database else { + throw DatabaseError.isSuspended } + + database.beginTransaction() + + let statusArray = statuses.map { $0.databaseDictionary() } + database.insertRows(statusArray, insertType: .orReplace, tableName: Self.tableName) + + database.commit() } - } private extension SyncStatusTable { + static let creationStatements = """ + CREATE TABLE if not EXISTS syncStatus (articleID TEXT NOT NULL, key TEXT NOT NULL, flag BOOL NOT NULL DEFAULT 0, selected BOOL NOT NULL DEFAULT 0, PRIMARY KEY (articleID, key)); + """ + func statusWithRow(_ row: FMResultSet) -> SyncStatus? { + guard let articleID = row.string(forColumn: DatabaseKey.articleID), let rawKey = row.string(forColumn: DatabaseKey.key), let key = SyncStatus.Key(rawValue: rawKey) else { @@ -197,45 +227,19 @@ private extension SyncStatusTable { return SyncStatus(articleID: articleID, key: key, flag: flag, selected: selected) } - - func selectPendingArticleIDsAsync(_ statusKey: ArticleStatus.Key, _ completion: @escaping SyncStatusArticleIDsCompletionBlock) { - queue.runInDatabase { databaseResult in + func selectPendingArticleIDs(_ statusKey: ArticleStatus.Key) throws -> Set? { - func makeDatabaseCall(_ database: FMDatabase) { - let sql = "select articleID from syncStatus where selected == false and key = \"\(statusKey.rawValue)\";" + guard let database else { + throw DatabaseError.isSuspended + } - guard let resultSet = database.executeQuery(sql, withArgumentsIn: nil) else { - DispatchQueue.main.async { - completion(.success(Set())) - } - return - } + let sql = "select articleID from syncStatus where selected == false and key = \"\(statusKey.rawValue)\";" + guard let resultSet = database.executeQuery(sql, withArgumentsIn: nil) else { + return nil + } - let articleIDs = resultSet.mapToSet{ $0.string(forColumnIndex: 0) } - DispatchQueue.main.async { - completion(.success(articleIDs)) - } - } - - switch databaseResult { - case .success(let database): - makeDatabaseCall(database) - case .failure(let databaseError): - DispatchQueue.main.async { - completion(.failure(databaseError)) - } - } - } - } - -} - -private func callCompletion(_ completion: DatabaseCompletionBlock?, _ databaseError: DatabaseError?) { - guard let completion = completion else { - return - } - DispatchQueue.main.async { - completion(databaseError) + let articleIDs = resultSet.mapToSet{ $0.string(forColumnIndex: 0) } + return articleIDs } }