Make SyncStatusTable an actor instead of using a serial dispatchQueue.

This commit is contained in:
Brent Simmons
2024-03-10 16:35:55 -07:00
parent 1a14d369bc
commit ee58096a48
3 changed files with 348 additions and 222 deletions

View File

@@ -8,60 +8,185 @@
import Foundation
import RSCore
import RSDatabase
import Database
public typealias SyncStatusesResult = Result<Array<SyncStatus>, DatabaseError>
public typealias SyncStatusesCompletionBlock = (SyncStatusesResult) -> Void
public typealias SyncStatusArticleIDsResult = Result<Set<String>, DatabaseError>
public typealias SyncStatusArticleIDsCompletionBlock = (SyncStatusArticleIDsResult) -> Void
public struct SyncDatabase {
public struct SyncDatabase: Sendable {
private let syncStatusTable: SyncStatusTable
private let queue: DatabaseQueue
public init(databaseFilePath: String) {
let queue = DatabaseQueue(databasePath: databaseFilePath)
try! queue.runCreateStatements(SyncDatabase.tableCreationStatements)
queue.vacuumIfNeeded(daysBetweenVacuums: 11)
self.queue = queue
self.syncStatusTable = SyncStatusTable(queue: queue)
self.syncStatusTable = SyncStatusTable(databasePath: databaseFilePath)
}
// MARK: - API
public func insertStatuses(_ statuses: [SyncStatus], completion: @escaping DatabaseCompletionBlock) {
syncStatusTable.insertStatuses(statuses, completion: completion)
public func insertStatuses(_ statuses: [SyncStatus]) async throws {
try await syncStatusTable.insertStatuses(statuses)
}
public func selectForProcessing(limit: Int? = nil) async throws -> Set<SyncStatus>? {
try await syncStatusTable.selectForProcessing(limit: limit)
}
public func selectPendingCount() async throws -> Int? {
try await syncStatusTable.selectPendingCount()
}
public func selectPendingReadStatusArticleIDs() async throws -> Set<String>? {
try await syncStatusTable.selectPendingReadStatusArticleIDs()
}
public func selectPendingStarredStatusArticleIDs() async throws -> Set<String>? {
try await syncStatusTable.selectPendingStarredStatusArticleIDs()
}
public func resetAllSelectedForProcessing() async throws {
try await syncStatusTable.resetAllSelectedForProcessing()
}
public func resetSelectedForProcessing(_ articleIDs: [String]) async throws {
try await syncStatusTable.resetSelectedForProcessing(articleIDs)
}
public func deleteSelectedForProcessing(_ articleIDs: [String]) async throws {
try await syncStatusTable.deleteSelectedForProcessing(articleIDs)
}
// MARK: - Suspend and Resume (for iOS)
/// Close the database and stop running database calls.
///
/// On Macs, suspend() and resume() do nothing. Theyre not needed.
public func suspend() async {
await syncStatusTable.suspend()
}
/// Open the database and allow for running database calls again.
public func resume() async {
await syncStatusTable.resume()
}
}
// MARK: - Compatibility
// Use the below until switching to the async version of the API.
public typealias SyncStatusesResult = Result<Array<SyncStatus>, DatabaseError>
public typealias SyncStatusesCompletionBlock = @Sendable (SyncStatusesResult) -> Void
public typealias SyncStatusArticleIDsResult = Result<Set<String>, DatabaseError>
public typealias SyncStatusArticleIDsCompletionBlock = @Sendable (SyncStatusArticleIDsResult) -> Void
extension SyncDatabase {
public func insertStatuses(_ statuses: [SyncStatus], completion: @escaping DatabaseCompletionBlock) {
Task {
do {
try await self.insertStatuses(statuses)
completion(nil)
} catch {
completion(DatabaseError.isSuspended)
}
}
}
public func selectForProcessing(limit: Int? = nil, completion: @escaping SyncStatusesCompletionBlock) {
return syncStatusTable.selectForProcessing(limit: limit, completion: completion)
Task {
do {
if let syncStatuses = try await self.selectForProcessing(limit: limit) {
completion(.success(Array(syncStatuses)))
} else {
completion(.success([SyncStatus]()))
}
} catch {
completion(.failure(DatabaseError.isSuspended))
}
}
}
public func selectPendingCount(completion: @escaping DatabaseIntCompletionBlock) {
syncStatusTable.selectPendingCount(completion)
Task {
do {
if let count = try await self.selectPendingCount() {
completion(.success(count))
} else {
completion(.success(0))
}
} catch {
completion(.failure(DatabaseError.isSuspended))
}
}
}
public func selectPendingReadStatusArticleIDs(completion: @escaping SyncStatusArticleIDsCompletionBlock) {
Task {
do {
if let articleIDs = try await self.selectPendingReadStatusArticleIDs() {
completion(.success(articleIDs))
} else {
completion(.success(Set<String>()))
}
} catch {
completion(.failure(DatabaseError.isSuspended))
}
}
}
public func selectPendingStarredStatusArticleIDs(completion: @escaping SyncStatusArticleIDsCompletionBlock) {
Task {
do {
if let articleIDs = try await self.selectPendingStarredStatusArticleIDs() {
completion(.success(articleIDs))
} else {
completion(.success(Set<String>()))
}
} catch {
completion(.failure(DatabaseError.isSuspended))
}
}
}
public func selectPendingReadStatusArticleIDs(completion: @escaping SyncStatusArticleIDsCompletionBlock) {
syncStatusTable.selectPendingReadStatusArticleIDs(completion: completion)
}
public func selectPendingStarredStatusArticleIDs(completion: @escaping SyncStatusArticleIDsCompletionBlock) {
syncStatusTable.selectPendingStarredStatusArticleIDs(completion: completion)
}
public func resetAllSelectedForProcessing(completion: DatabaseCompletionBlock? = nil) {
syncStatusTable.resetAllSelectedForProcessing(completion: completion)
Task {
do {
try await self.resetAllSelectedForProcessing()
completion?(nil)
} catch {
completion?(DatabaseError.isSuspended)
}
}
}
public func resetSelectedForProcessing(_ articleIDs: [String], completion: DatabaseCompletionBlock? = nil) {
syncStatusTable.resetSelectedForProcessing(articleIDs, completion: completion)
Task {
do {
try await self.resetSelectedForProcessing(articleIDs)
completion?(nil)
} catch {
completion?(DatabaseError.isSuspended)
}
}
}
public func deleteSelectedForProcessing(_ articleIDs: [String], completion: DatabaseCompletionBlock? = nil) {
syncStatusTable.deleteSelectedForProcessing(articleIDs, completion: completion)
public func deleteSelectedForProcessing(_ articleIDs: [String], completion: DatabaseCompletionBlock? = nil) {
Task {
do {
try await self.deleteSelectedForProcessing(articleIDs)
completion?(nil)
} catch {
completion?(DatabaseError.isSuspended)
}
}
}
// MARK: - Suspend and Resume (for iOS)
@@ -69,20 +194,17 @@ public struct SyncDatabase {
/// Close the database and stop running database calls.
/// Any pending calls will complete first.
public func suspend() {
queue.suspend()
Task {
await self.suspend()
}
}
/// Open the database and allow for running database calls again.
public func resume() {
queue.resume()
Task {
await self.resume()
}
}
}
// MARK: - Private
private extension SyncDatabase {
static let tableCreationStatements = """
CREATE TABLE if not EXISTS syncStatus (articleID TEXT NOT NULL, key TEXT NOT NULL, flag BOOL NOT NULL DEFAULT 0, selected BOOL NOT NULL DEFAULT 0, PRIMARY KEY (articleID, key));
"""
}

View File

@@ -8,11 +8,11 @@
import Foundation
import Articles
import RSDatabase
import Database
public struct SyncStatus: Hashable, Equatable {
public enum Key: String {
public struct SyncStatus: Hashable, Equatable, Sendable {
public enum Key: String, Sendable {
case read = "read"
case starred = "starred"
case deleted = "deleted"

View File

@@ -9,183 +9,213 @@
import Foundation
import RSCore
import Articles
import RSDatabase
import RSDatabaseObjC
import Database
import FMDB
struct SyncStatusTable: DatabaseTable {
extension FMDatabase {
let name = DatabaseTableName.syncStatus
private let queue: DatabaseQueue
static func openAndSetUpDatabase(path: String) -> FMDatabase {
init(queue: DatabaseQueue) {
self.queue = queue
let database = FMDatabase(path: path)!
database.open()
database.executeStatements("PRAGMA synchronous = 1;")
database.setShouldCacheStatements(true)
return database
}
func selectForProcessing(limit: Int?, completion: @escaping SyncStatusesCompletionBlock) {
queue.runInTransaction { databaseResult in
var statuses = Set<SyncStatus>()
var error: DatabaseError?
func executeUpdateInTransaction(_ sql : String, withArgumentsIn parameters: [Any]?) {
func makeDatabaseCall(_ database: FMDatabase) {
let updateSQL = "update syncStatus set selected = true"
database.executeUpdate(updateSQL, withArgumentsIn: nil)
var selectSQL = "select * from syncStatus where selected == true"
if let limit = limit {
selectSQL = "\(selectSQL) limit \(limit)"
}
if let resultSet = database.executeQuery(selectSQL, withArgumentsIn: nil) {
statuses = resultSet.mapToSet(self.statusWithRow)
}
}
switch databaseResult {
case .success(let database):
makeDatabaseCall(database)
case .failure(let databaseError):
error = databaseError
}
DispatchQueue.main.async {
if let error = error {
completion(.failure(error))
}
else {
completion(.success(Array(statuses)))
}
}
}
beginTransaction()
executeUpdate(sql, withArgumentsIn: parameters)
commit()
}
func selectPendingCount(_ completion: @escaping DatabaseIntCompletionBlock) {
queue.runInDatabase { databaseResult in
var count: Int = 0
var error: DatabaseError?
func makeDatabaseCall(_ database: FMDatabase) {
let sql = "select count(*) from syncStatus"
if let resultSet = database.executeQuery(sql, withArgumentsIn: nil) {
count = self.numberWithCountResultSet(resultSet)
}
}
func vacuum() {
switch databaseResult {
case .success(let database):
makeDatabaseCall(database)
case .failure(let databaseError):
error = databaseError
}
executeStatements("vacuum;")
}
DispatchQueue.main.async {
if let error = error {
completion(.failure(error))
}
else {
completion(.success(count))
}
func runCreateStatements(_ statements: String) {
statements.enumerateLines { (line, stop) in
if line.lowercased().hasPrefix("create") {
self.executeStatements(line)
}
stop = false
}
}
func selectPendingReadStatusArticleIDs(completion: @escaping SyncStatusArticleIDsCompletionBlock) {
selectPendingArticleIDsAsync(.read, completion)
}
func selectPendingStarredStatusArticleIDs(completion: @escaping SyncStatusArticleIDsCompletionBlock) {
selectPendingArticleIDsAsync(.starred, completion)
}
func resetAllSelectedForProcessing(completion: DatabaseCompletionBlock? = nil) {
queue.runInTransaction { databaseResult in
func insertRows(_ dictionaries: [DatabaseDictionary], insertType: RSDatabaseInsertType, tableName: String) {
func makeDatabaseCall(_ database: FMDatabase) {
let updateSQL = "update syncStatus set selected = false"
database.executeUpdate(updateSQL, withArgumentsIn: nil)
}
switch databaseResult {
case .success(let database):
makeDatabaseCall(database)
callCompletion(completion, nil)
case .failure(let databaseError):
callCompletion(completion, databaseError)
}
for dictionary in dictionaries {
_ = rs_insertRow(with: dictionary, insertType: insertType, tableName: tableName)
}
}
}
extension FMResultSet {
func intWithCountResult() -> Int? {
guard next() else {
return nil
}
return Int(long(forColumnIndex: 0))
}
}
actor SyncStatusTable {
static private let tableName = "syncStatus"
private var database: FMDatabase?
private let databasePath: String
init(databasePath: String) {
let database = FMDatabase.openAndSetUpDatabase(path: databasePath)
database.runCreateStatements(SyncStatusTable.creationStatements)
database.vacuum()
self.database = database
self.databasePath = databasePath
}
func suspend() {
#if os(iOS)
database?.close()
database = nil
#endif
}
func resume() {
#if os(iOS)
if database == nil {
self.database = FMDatabase.openAndSetUpDatabase(path: databasePath)
}
#endif
}
func close() {
database?.close()
}
func selectForProcessing(limit: Int?) throws -> Set<SyncStatus>? {
guard let database else {
throw DatabaseError.isSuspended
}
let updateSQL = "update syncStatus set selected = true"
database.executeUpdateInTransaction(updateSQL, withArgumentsIn: nil)
let selectSQL = {
var sql = "select * from syncStatus where selected == true"
if let limit {
sql = "\(sql) limit \(limit)"
}
return sql
}()
guard let resultSet = database.executeQuery(selectSQL, withArgumentsIn: nil) else {
return nil
}
let statuses = resultSet.mapToSet(self.statusWithRow)
return statuses
}
func selectPendingCount() throws -> Int? {
guard let database else {
throw DatabaseError.isSuspended
}
let sql = "select count(*) from syncStatus"
guard let resultSet = database.executeQuery(sql, withArgumentsIn: nil) else {
return nil
}
let count = resultSet.intWithCountResult()
return count
}
func selectPendingReadStatusArticleIDs() throws -> Set<String>? {
try selectPendingArticleIDs(.read)
}
func selectPendingStarredStatusArticleIDs() throws -> Set<String>? {
try selectPendingArticleIDs(.starred)
}
func resetAllSelectedForProcessing() throws {
guard let database else {
throw DatabaseError.isSuspended
}
let updateSQL = "update syncStatus set selected = false"
database.executeUpdateInTransaction(updateSQL, withArgumentsIn: nil)
}
func resetSelectedForProcessing(_ articleIDs: [String]) throws {
func resetSelectedForProcessing(_ articleIDs: [String], completion: DatabaseCompletionBlock? = nil) {
guard !articleIDs.isEmpty else {
callCompletion(completion, nil)
return
}
queue.runInTransaction { databaseResult in
func makeDatabaseCall(_ database: FMDatabase) {
let parameters = articleIDs.map { $0 as AnyObject }
let placeholders = NSString.rs_SQLValueList(withPlaceholders: UInt(articleIDs.count))!
let updateSQL = "update syncStatus set selected = false where articleID in \(placeholders)"
database.executeUpdate(updateSQL, withArgumentsIn: parameters)
}
switch databaseResult {
case .success(let database):
makeDatabaseCall(database)
callCompletion(completion, nil)
case .failure(let databaseError):
callCompletion(completion, databaseError)
}
guard let database else {
throw DatabaseError.isSuspended
}
let parameters = articleIDs.map { $0 as AnyObject }
let placeholders = NSString.rs_SQLValueList(withPlaceholders: UInt(articleIDs.count))!
let updateSQL = "update syncStatus set selected = false where articleID in \(placeholders)"
database.executeUpdateInTransaction(updateSQL, withArgumentsIn: parameters)
}
func deleteSelectedForProcessing(_ articleIDs: [String], completion: DatabaseCompletionBlock? = nil) {
func deleteSelectedForProcessing(_ articleIDs: [String]) throws {
guard !articleIDs.isEmpty else {
callCompletion(completion, nil)
return
}
queue.runInTransaction { databaseResult in
func makeDatabaseCall(_ database: FMDatabase) {
let parameters = articleIDs.map { $0 as AnyObject }
let placeholders = NSString.rs_SQLValueList(withPlaceholders: UInt(articleIDs.count))!
let deleteSQL = "delete from syncStatus where selected = true and articleID in \(placeholders)"
database.executeUpdate(deleteSQL, withArgumentsIn: parameters)
}
switch databaseResult {
case .success(let database):
makeDatabaseCall(database)
callCompletion(completion, nil)
case .failure(let databaseError):
callCompletion(completion, databaseError)
}
guard let database else {
throw DatabaseError.isSuspended
}
let parameters = articleIDs.map { $0 as AnyObject }
let placeholders = NSString.rs_SQLValueList(withPlaceholders: UInt(articleIDs.count))!
let deleteSQL = "delete from syncStatus where selected = true and articleID in \(placeholders)"
database.executeUpdateInTransaction(deleteSQL, withArgumentsIn: parameters)
}
func insertStatuses(_ statuses: [SyncStatus], completion: @escaping DatabaseCompletionBlock) {
queue.runInTransaction { databaseResult in
func makeDatabaseCall(_ database: FMDatabase) {
let statusArray = statuses.map { $0.databaseDictionary() }
self.insertRows(statusArray, insertType: .orReplace, in: database)
}
func insertStatuses(_ statuses: [SyncStatus]) throws {
switch databaseResult {
case .success(let database):
makeDatabaseCall(database)
callCompletion(completion, nil)
case .failure(let databaseError):
callCompletion(completion, databaseError)
}
guard let database else {
throw DatabaseError.isSuspended
}
database.beginTransaction()
let statusArray = statuses.map { $0.databaseDictionary() }
database.insertRows(statusArray, insertType: .orReplace, tableName: Self.tableName)
database.commit()
}
}
private extension SyncStatusTable {
static let creationStatements = """
CREATE TABLE if not EXISTS syncStatus (articleID TEXT NOT NULL, key TEXT NOT NULL, flag BOOL NOT NULL DEFAULT 0, selected BOOL NOT NULL DEFAULT 0, PRIMARY KEY (articleID, key));
"""
func statusWithRow(_ row: FMResultSet) -> SyncStatus? {
guard let articleID = row.string(forColumn: DatabaseKey.articleID),
let rawKey = row.string(forColumn: DatabaseKey.key),
let key = SyncStatus.Key(rawValue: rawKey) else {
@@ -197,45 +227,19 @@ private extension SyncStatusTable {
return SyncStatus(articleID: articleID, key: key, flag: flag, selected: selected)
}
func selectPendingArticleIDsAsync(_ statusKey: ArticleStatus.Key, _ completion: @escaping SyncStatusArticleIDsCompletionBlock) {
queue.runInDatabase { databaseResult in
func selectPendingArticleIDs(_ statusKey: ArticleStatus.Key) throws -> Set<String>? {
func makeDatabaseCall(_ database: FMDatabase) {
let sql = "select articleID from syncStatus where selected == false and key = \"\(statusKey.rawValue)\";"
guard let database else {
throw DatabaseError.isSuspended
}
guard let resultSet = database.executeQuery(sql, withArgumentsIn: nil) else {
DispatchQueue.main.async {
completion(.success(Set<String>()))
}
return
}
let sql = "select articleID from syncStatus where selected == false and key = \"\(statusKey.rawValue)\";"
guard let resultSet = database.executeQuery(sql, withArgumentsIn: nil) else {
return nil
}
let articleIDs = resultSet.mapToSet{ $0.string(forColumnIndex: 0) }
DispatchQueue.main.async {
completion(.success(articleIDs))
}
}
switch databaseResult {
case .success(let database):
makeDatabaseCall(database)
case .failure(let databaseError):
DispatchQueue.main.async {
completion(.failure(databaseError))
}
}
}
}
}
private func callCompletion(_ completion: DatabaseCompletionBlock?, _ databaseError: DatabaseError?) {
guard let completion = completion else {
return
}
DispatchQueue.main.async {
completion(databaseError)
let articleIDs = resultSet.mapToSet{ $0.string(forColumnIndex: 0) }
return articleIDs
}
}