diff --git a/Frameworks/Account/Account.swift b/Frameworks/Account/Account.swift index 288943296..cc4fc37f4 100644 --- a/Frameworks/Account/Account.swift +++ b/Frameworks/Account/Account.swift @@ -166,6 +166,10 @@ public final class Account: DisplayNameProvider, UnreadCountProvider, Container, } return _externalIDToWebFeedDictionary } + + var flattenedWebFeedURLs: Set { + return Set(flattenedWebFeeds().map({ $0.url })) + } var username: String? { get { diff --git a/Frameworks/Account/CloudKit/CloudKitAccountDelegate.swift b/Frameworks/Account/CloudKit/CloudKitAccountDelegate.swift index 9d456e461..37bc811a9 100644 --- a/Frameworks/Account/CloudKit/CloudKitAccountDelegate.swift +++ b/Frameworks/Account/CloudKit/CloudKitAccountDelegate.swift @@ -178,7 +178,9 @@ final class CloudKitAccountDelegate: AccountDelegate { let normalizedItems = OPMLNormalizer.normalize(opmlItems) - var webFeedURLs = Set() + // Combine all existing web feed URLs with all the new ones + + var webFeedURLs = account.flattenedWebFeedURLs for opmlItem in normalizedItems { if let webFeedURL = opmlItem.feedSpecifier?.feedURL { webFeedURLs.insert(webFeedURL) @@ -193,37 +195,18 @@ final class CloudKitAccountDelegate: AccountDelegate { } } - refreshProgress.addToNumberOfTasksAndRemaining(webFeedURLs.count + 1) - var errorOccurred = false +// os_log(.error, log: self.log, "Error while subscribing to the feed: %@", error.localizedDescription) - // You have to single thread these or CloudKit gets overwhelmed and freaks out - func takeOneAndPassItOn(_ webFeedURLs: [String], completion: @escaping () -> Void) { - var remainingWebFeedURLS = webFeedURLs - - if let webFeedURL = remainingWebFeedURLS.popLast() { - publicZone.createSubscription(webFeedURL) { result in - self.refreshProgress.completeTask() - if case .failure(let error) = result { - os_log(.error, log: self.log, "Error while subscribing to the feed: %@", error.localizedDescription) - errorOccurred = true - } - takeOneAndPassItOn(remainingWebFeedURLS, completion: completion) - } - } else { - completion() - } - - } - - takeOneAndPassItOn(Array(webFeedURLs)) { - if errorOccurred { - self.refreshProgress.completeTask() - completion(.failure(CloudKitZoneError.unknown)) - } else { + refreshProgress.addToNumberOfTasksAndRemaining(2) + publicZone.manageSubscriptions(webFeedURLs) { result in + self.refreshProgress.completeTask() + switch result { + case .success: self.accountZone.importOPML(rootExternalID: rootExternalID, items: normalizedItems) { _ in - self.refreshProgress.completeTask() - completion(.success(())) + self.refreshAll(for: account, completion: completion) } + case .failure(let error): + completion(.failure(error)) } } @@ -269,13 +252,13 @@ final class CloudKitAccountDelegate: AccountDelegate { feed.externalID = externalID container.addWebFeed(feed) - self.publicZone.createSubscription(feed.url) { result in + self.publicZone.manageSubscriptions(account.flattenedWebFeedURLs) { result in self.refreshProgress.completeTask() if case .failure(let error) = result { os_log(.error, log: self.log, "An error occurred while creating the subscription: %@", error.localizedDescription) } } - + InitialFeedDownloader.download(url) { parsedFeed in self.refreshProgress.completeTask() @@ -328,7 +311,7 @@ final class CloudKitAccountDelegate: AccountDelegate { case .success(let deleted): container.removeWebFeed(feed) if deleted { - self.publicZone.removeSubscription(feed, completion: completion) + self.publicZone.manageSubscriptions(account.flattenedWebFeedURLs, completion: completion) } else { completion(.success(())) } @@ -490,7 +473,7 @@ final class CloudKitAccountDelegate: AccountDelegate { } func accountDidInitialize(_ account: Account) { - accountZone.delegate = CloudKitAcountZoneDelegate(account: account, publicZone: publicZone, refreshProgress: refreshProgress) + accountZone.delegate = CloudKitAcountZoneDelegate(account: account, refreshProgress: refreshProgress) articlesZone.delegate = CloudKitArticlesZoneDelegate(account: account, database: database, articlesZone: articlesZone) // Check to see if this is a new account and initialize anything we need diff --git a/Frameworks/Account/CloudKit/CloudKitPublicZone.swift b/Frameworks/Account/CloudKit/CloudKitPublicZone.swift index ca6a76a3e..05ef0d7d7 100644 --- a/Frameworks/Account/CloudKit/CloudKitPublicZone.swift +++ b/Frameworks/Account/CloudKit/CloudKitPublicZone.swift @@ -59,106 +59,47 @@ final class CloudKitPublicZone: CloudKitZone { completion() } - /// Create a CloudKit subscription for the webfeed and any other supporting records that we need - func createSubscription(_ webFeedURL: String, completion: @escaping (Result) -> Void) { - let webFeedURLMD5String = webFeedURL.md5String + /// Create any new subscriptions and delete any old ones + func manageSubscriptions(_ webFeedURLs: Set, completion: @escaping (Result) -> Void) { - func createSubscription(_ webFeedRecordRef: CKRecord.Reference) { - let predicate = NSPredicate(format: "webFeed = %@", webFeedRecordRef) - let subscription = CKQuerySubscription(recordType: CloudKitWebFeed.recordType, predicate: predicate, options: [.firesOnRecordUpdate]) - - let info = CKSubscription.NotificationInfo() - info.shouldSendContentAvailable = true - info.desiredKeys = [CloudKitWebFeed.Fields.httpLastModified, CloudKitWebFeed.Fields.httpEtag] - subscription.notificationInfo = info - - self.save(subscription) { result in - switch result { - case .success(let subscription): - - let userSubscriptionRecord = CKRecord(recordType: CloudKitUserSubscription.recordType, recordID: self.generateRecordID()) - userSubscriptionRecord[CloudKitUserSubscription.Fields.userRecordID] = self.container?.userRecordID - userSubscriptionRecord[CloudKitUserSubscription.Fields.webFeed] = webFeedRecordRef - userSubscriptionRecord[CloudKitUserSubscription.Fields.subscriptionID] = subscription.subscriptionID + var webFeedRecords = [CKRecord]() + for webFeedURL in webFeedURLs { + let webFeedRecordID = CKRecord.ID(recordName: webFeedURL.md5String, zoneID: Self.zoneID) + let webFeedRecord = CKRecord(recordType: CloudKitWebFeed.recordType, recordID: webFeedRecordID) + webFeedRecord[CloudKitWebFeed.Fields.url] = webFeedURL + webFeedRecord[CloudKitWebFeed.Fields.httpLastModified] = "" + webFeedRecord[CloudKitWebFeed.Fields.httpEtag] = "" + webFeedRecords.append(webFeedRecord) + } - self.save(userSubscriptionRecord, completion: completion) - + self.saveIfNew(webFeedRecords) { _ in + + var subscriptions = [CKSubscription]() + let webFeedURLChunks = Array(webFeedURLs).chunked(into: 20) + for webFeedURLChunk in webFeedURLChunks { + + let predicate = NSPredicate(format: "url in %@", webFeedURLChunk) + let subscription = CKQuerySubscription(recordType: CloudKitWebFeed.recordType, predicate: predicate, options: [.firesOnRecordUpdate]) + let info = CKSubscription.NotificationInfo() + info.shouldSendContentAvailable = true + info.desiredKeys = [CloudKitWebFeed.Fields.httpLastModified, CloudKitWebFeed.Fields.httpEtag] + subscription.notificationInfo = info + subscriptions.append(subscription) + + } + + self.fetchAllUserSubscriptions() { result in + switch result { + case .success(let subscriptionsToDelete): + let subscriptionToDeleteIDs = subscriptionsToDelete.map({ $0.subscriptionID }) + self.modify(subscriptionsToSave: subscriptions, subscriptionIDsToDelete: subscriptionToDeleteIDs, completion: completion) case .failure(let error): completion(.failure(error)) } } + } - fetch(externalID: webFeedURLMD5String) { result in - switch result { - case .success(let record): - - let webFeedRecordRef = CKRecord.Reference(record: record, action: .none) - createSubscription(webFeedRecordRef) - - case .failure: - - let webFeedRecordID = CKRecord.ID(recordName: webFeedURLMD5String, zoneID: Self.zoneID) - let webFeedRecordRef = CKRecord.Reference(recordID: webFeedRecordID, action: .none) - let webFeedRecord = CKRecord(recordType: CloudKitWebFeed.recordType, recordID: webFeedRecordID) - webFeedRecord[CloudKitWebFeed.Fields.url] = webFeedURL - webFeedRecord[CloudKitWebFeed.Fields.httpLastModified] = "" - webFeedRecord[CloudKitWebFeed.Fields.httpEtag] = "" - - let webFeedCheckRecord = CKRecord(recordType: CloudKitWebFeedCheck.recordType, recordID: self.generateRecordID()) - webFeedRecord[CloudKitWebFeedCheck.Fields.webFeed] = webFeedRecordRef - webFeedRecord[CloudKitWebFeedCheck.Fields.lastCheck] = Date.distantPast - - self.save([webFeedRecord, webFeedCheckRecord]) { result in - switch result { - case .success: - createSubscription(webFeedRecordRef) - case .failure(let error): - completion(.failure(error)) - } - } - - } - } - - } - - /// Remove the subscription for the given feed along with its supporting record - func removeSubscription(_ webFeed: WebFeed, completion: @escaping (Result) -> Void) { - guard let userRecordID = self.container?.userRecordID else { - completion(.failure(CloudKitZoneError.invalidParameter)) - return - } - - let webFeedRecordID = CKRecord.ID(recordName: webFeed.url.md5String, zoneID: Self.zoneID) - let webFeedRecordRef = CKRecord.Reference(recordID: webFeedRecordID, action: .none) - let predicate = NSPredicate(format: "userRecordID = %@ AND webFeed = %@", userRecordID, webFeedRecordRef) - let ckQuery = CKQuery(recordType: CloudKitUserSubscription.recordType, predicate: predicate) - - query(ckQuery) { result in - switch result { - case .success(let records): - - if records.count > 0, let subscriptionID = records[0][CloudKitUserSubscription.Fields.subscriptionID] as? String { - self.delete(subscriptionID: subscriptionID) { result in - switch result { - case .success: - self.delete(recordID: records[0].recordID, completion: completion) - case .failure(let error): - completion(.failure(error)) - } - } - - } else { - os_log(.error, log: self.log, "Remove subscription error. The subscription wasn't found.") - completion(.success(())) - } - - case .failure(let error): - completion(.failure(error)) - } - } - } } diff --git a/Frameworks/Account/CloudKit/CloudKitZone.swift b/Frameworks/Account/CloudKit/CloudKitZone.swift index dc528e655..bf18652e3 100644 --- a/Frameworks/Account/CloudKit/CloudKitZone.swift +++ b/Frameworks/Account/CloudKit/CloudKitZone.swift @@ -200,6 +200,76 @@ extension CloudKitZone { modify(recordsToSave: records, recordIDsToDelete: [], completion: completion) } + /// Saves or modifies the records as long as they are unchanged relative to the local version + func saveIfNew(_ records: [CKRecord], completion: @escaping (Result) -> Void) { + let op = CKModifyRecordsOperation(recordsToSave: records, recordIDsToDelete: [CKRecord.ID]()) + op.savePolicy = .ifServerRecordUnchanged + op.isAtomic = false + + op.modifyRecordsCompletionBlock = { [weak self] (_, _, error) in + + guard let self = self else { return } + + switch CloudKitZoneResult.resolve(error) { + case .success: + DispatchQueue.main.async { + completion(.success(())) + } + case .zoneNotFound: + self.createZoneRecord() { result in + switch result { + case .success: + self.saveIfNew(records, completion: completion) + case .failure(let error): + DispatchQueue.main.async { + completion(.failure(error)) + } + } + } + case .userDeletedZone: + DispatchQueue.main.async { + completion(.failure(CloudKitZoneError.userDeletedZone)) + } + case .retry(let timeToWait): + self.retryIfPossible(after: timeToWait) { + self.saveIfNew(records, completion: completion) + } + case .limitExceeded: + + let chunkedRecords = records.chunked(into: 300) + + let group = DispatchGroup() + var errorOccurred = false + + for chunk in chunkedRecords { + group.enter() + self.saveIfNew(chunk) { result in + if case .failure(let error) = result { + os_log(.error, log: self.log, "%@ zone modify records error: %@", Self.zoneID.zoneName, error.localizedDescription) + errorOccurred = true + } + group.leave() + } + } + + group.notify(queue: DispatchQueue.main) { + if errorOccurred { + completion(.failure(CloudKitZoneError.unknown)) + } else { + completion(.success(())) + } + } + + default: + DispatchQueue.main.async { + completion(.failure(CloudKitError(error!))) + } + } + } + + database?.add(op) + } + /// Save the CKSubscription func save(_ subscription: CKSubscription, completion: @escaping (Result) -> Void) { database?.save(subscription) { savedSubscription, error in @@ -266,18 +336,37 @@ extension CloudKitZone { } } } + + /// Bulk add (or modify I suppose) and delete of subscriptions + func modify(subscriptionsToSave: [CKSubscription], subscriptionIDsToDelete: [CKSubscription.ID], completion: @escaping (Result) -> Void) { + let op = CKModifySubscriptionsOperation(subscriptionsToSave: subscriptionsToSave, subscriptionIDsToDelete: subscriptionIDsToDelete) + + op.modifySubscriptionsCompletionBlock = { [weak self] (_, _, error) in + guard let self = self else { return } + + switch CloudKitZoneResult.resolve(error) { + case .success: + DispatchQueue.main.async { + completion(.success(())) + } + case .retry(let timeToWait): + self.retryIfPossible(after: timeToWait) { + self.modify(subscriptionsToSave: subscriptionsToSave, subscriptionIDsToDelete: subscriptionIDsToDelete, completion: completion) + } + default: + DispatchQueue.main.async { + completion(.failure(CloudKitError(error!))) + } + } + } + + database?.add(op) + } /// Modify and delete the supplied CKRecords and CKRecord.IDs func modify(recordsToSave: [CKRecord], recordIDsToDelete: [CKRecord.ID], completion: @escaping (Result) -> Void) { let op = CKModifyRecordsOperation(recordsToSave: recordsToSave, recordIDsToDelete: recordIDsToDelete) - - // We use .changedKeys savePolicy to do unlocked changes here cause my app is contentious and off-line first - // Apple suggests using .ifServerRecordUnchanged save policy - // For more, see Advanced CloudKit(https://developer.apple.com/videos/play/wwdc2014/231/) op.savePolicy = .changedKeys - - // To avoid CKError.partialFailure, make the operation atomic (if one record fails to get modified, they all fail) - // If you want to handle partial failures, set .isAtomic to false and implement CKOperationResultType .fail(reason: .partialFailure) where appropriate op.isAtomic = true op.modifyRecordsCompletionBlock = { [weak self] (_, _, error) in @@ -309,7 +398,6 @@ extension CloudKitZone { self.modify(recordsToSave: recordsToSave, recordIDsToDelete: recordIDsToDelete, completion: completion) } case .limitExceeded: - let chunkedRecords = recordsToSave.chunked(into: 300) let group = DispatchGroup() @@ -343,7 +431,27 @@ extension CloudKitZone { database?.add(op) } - + + /// Fetch all the subscriptions that a user has in the current database in all zones + func fetchAllUserSubscriptions(completion: @escaping (Result<[CKSubscription], Error>) -> Void) { + database?.fetchAllSubscriptions() { subscriptions, error in + switch CloudKitZoneResult.resolve(error) { + case .success: + DispatchQueue.main.async { + completion(.success((subscriptions!))) + } + case .retry(let timeToWait): + self.retryIfPossible(after: timeToWait) { + self.fetchAllUserSubscriptions(completion: completion) + } + default: + DispatchQueue.main.async { + completion(.failure(CloudKitError(error!))) + } + } + } + } + /// Fetch all the changes in the CKZone since the last time we checked func fetchChangesInZone(completion: @escaping (Result) -> Void) {