Table of contents
When we are talking about Rx, often times it all boils down to connecting (binding) our data sources with UI. You can see it clearly in our examples – we are connecting data to UI on a daily basis.
In previous parts of the series, apart from UI bindings, we were also talking about retrieving the data. When it comes to fetching data from the server, most of the time we have to parse it somehow. If data is large enough, the task of mapping can be memory and time consuming, especially when operation is scheduled on the main thread, we will block UI which will result in horrendous user experience of our end product.
In part #3 of the series we were mapping objects as well. We’ve used something called `MainScheduler.instance` in a few operators, because “we had to make sure our data is on the main thread”. In fact, this is a Scheduler, not a Thread, so why we were talking about threads at all? Moreover, we’ve just learned that we shouldn’t map objects on the main thread, but it seems like we did it in our last example. What is going on?
You can find out all the answers in today’s article! Remind yourself earlier parts of the series, grab a drink, and get ready for action!
Schedulers
We will start with a little bit of theory about schedulers. When we are doing some operations with Rx, by definition it is all done on the same thread. Unless you don’t change the thread manually, entry point of the chain will begin on the current thread and it will also dispose on the same thread.
Schedulers are not really a threads, but as the name suggests they are scheduling the tasks that they are given. We have 2 types of schedulers: serial and concurrent. Below is the list with the schedulers that are already built-in:
- CurrentThreadScheduler (Serial) – schedules on the current thread, this is also the default scheduler.
- MainScheduler (Serial) – schedules on the main thread.
- SerialDispatchQueueScheduler (Serial) – schedules on a specific queue (`dispatch_queue_t`).
- ConcurrentDispatchQueueScheduler (Concurrent) – schedules on a specific queue (`dispatch_queue_t`).
- OperationQueueScheduler (Concurrent) – schedules on a specific queue (`NSOperationQueue`).
Interesting thing is that if you pass a concurrent queue to a serial scheduler, RxSwift will make sure that it is transformed into a serial queue.
The other way around (passing serial queue to concurrent scheduler) shouldn’t cause any problems as well, but we’d rather avoid that, if possible.
You can also implement your own scheduler for some customization, this document is really helpful if you do so.
observeOn() & subscribeOn()
These two methods are really the core to multithreading. When you look at them, you might think that from the name it is really obvious what they should do. In reality, few people understand what is the difference between them and why specific behaviour happened with its usage. But let’s forget about them for a second, we’ve just got a call from a friend and we are going on a road trip!
Well… maybe not exactly that kind of an adventure, but really close. Our friend, Emily, had asked us to take care of her cat Ethan, while she was gone for holidays. She just came back and we need to take Ethan back to his mom. We have to drive few hours to her house, so better prepare for some road trip fun!
Normally when we drive to Emily, we take the default route through the highway. But today we wanted to change something in our life and we choose to go with the two-lane freeway. The weather is so great that after an hour of driving we stop the car to breathe some fresh air. Suddenly it hits us – driving in that weather on a highway would be the best thing ever. And because fresh air always had a bad impact on us, we decide to go back on the good old highway. With a company of good music, fine cat and a beautiful weather, after a few hours of driving we finally meet Emily, deliver the cat, and everyone is happy. And we’ve just learned both `observeOn()` and `subscribeOn()`.
To clarify – we are an Observable, Ethan is our produced Signal, route is a Scheduler and Emily is an Observer. Emily subscribes to us, and she believes that she will get a new signal (a cat). We also have a default route when we drive to Emily. Same thing with Rx, we also have there a default scheduler. But this time we’ve chosen to use a different route (scheduler) as a starting point. When you want to start your trip with route different than the default one, you use `subscribeOn()` method. Here’s the catch: if you use `subscribeOn()`, you can’t be sure that in the end of the trip (`subscribeNext()` used by Emily) you will be on the same route it started. You just guarantee that you will start there.
Second method, `observeOn()` can change the route as well. But it is not constrained to the beginning of the trip – you can switch the route using `observeOn()` anytime of the trip. In comparison, `subscribeOn()` switches route only in the beginning – that’s the difference. Most of the time you will use `observeOn()` though.
Alright, back to the cat delivery. Pseudo-code that would represent our delivery in RxSwift could look like this:
catObservable // 1 .breatheFreshAir() // 2 .observeOn(MainRouteScheduler.instance) // 3 .subscribeOn(TwoLaneFreewayScheduler.instance) // 4 .subscribeNext { cat in // 5 if cat is Ethan { hug(cat) } } .addDisposableTo(disposeBag)
Step-by-step:
1. We subscribe to Cat observable, which emits Cat signals.
2. We should be on the same scheduler we were before the subscription (which is the default behaviour of Rx).
3. Switch the scheduler to the `MainRouteScheduler`. Now every operation below this one will be scheduled on `MainRouteScheduler` (of course if we don’t change the scheduler again later in the pipeline).
4. Now we say that we start the chain on `TwoLaneFreewayScheduler`. So `breatheFreshAir()` will be scheduled on `TwoLaneFreewayScheduler`, and then the scheduler is changed again using `observeOn()`.
5. `subscribeNext()` gets scheduled by `MainRouteScheduler`. If we didn’t add the `observeOn()` before, it would get scheduled by `TwoLaneFreewayScheduler`.
[box header=’LET’S TALK ABOUT YOUR APP’ paragraph=’We’re 100% office based team with 7-years’ experience
in mobile & web app development’ button=’Estimate project’ link=”https://www.thedroidsonroids.com/estimate-project”]
In summary: `subscribeOn()` points in where to start the whole chaining, `observeOn()` points in where to head next. Image below (courtesy of reactivex.io) should now be pretty clear about what is happening when these methods are called. Notice the blue color of arrows that shows `subscribeOn()` scheduler and orange & pink colours which shows two different schedulers used when each of `observeOn()` is called.
And with that in mind we can point to our Example!
Example
In part #3 (which has knowledge that is mandatory in this example, so please be sure to review that one!) we’ve searched issues for given repository on GitHub. Today we will search for repositories of given username, also on GitHub. This time however, we will use Alamofire for network requests and ObjectMapper for parsing our objects. Thanks to awesome RxSwiftCommunity, we also have an extension to Alamofire and RxSwift, called RxAlamofire, which I’ve also mentioned in previous article.
First let’s create a project as we created in a tutorial before. We will also use CocoaPods with RxAlamofire/RxCocoa (which has RxSwift, RxCocoa and Alamofire as dependencies) and ObjectMapper. Our Podfile should look like this:
platform :ios, '8.0' use_frameworks! target 'RxAlamofireExample' do pod 'RxAlamofire/RxCocoa' pod 'ObjectMapper' end
Now onto the coding part! As you probably know, making an outline before doing the task is really helpful, so we will try to think about what we need to do and how we can approach this. Example outline in our case could be:
- Create UI, probably standard one with `UISearchBar` and `UITableView`.
- Observe search bar, and every time new value is given, transform it to array of repositories (if possible). Here we will need model for network requests.
- Update the table view with our new data. We need to think about schedulers and how to not flood the UI.
Step 1 – Controller and UI.
We will start with UI which again is just a `UITableView` and `UISearchBar`. You can follow the design in the GIF above, or you can create your own – what suits you the most!
Next, we will need a controller that is going to manage everything: starting from observing text field and ending on passing the fetched repositories to our table view. Let’s create new file, called `RepositoriesViewController.swift`, and prepare our controller with importing modules and basic configuration:
import UIKit import ObjectMapper import RxAlamofire import RxCocoa import RxSwift class RepositoriesViewController: UIViewController { @IBOutlet weak var tableView: UITableView! @IBOutlet weak var searchBar: UISearchBar! override func viewDidLoad() { super.viewDidLoad() setupRx() } func setupRx() { } }
Again we prepared the `setupRx()` method already, because we can feel that we will rely heavily on Rx (is that a prophecy?).
So let’s create our observable from the search bar’s `rx_text` property, like in our latest example (`throttle()` & `distinctUntilChanged()` included). But this time let’s add filter to that: we don’t really want empty values. We will just leave the latest request in a table view when that happens. So the observable should look like the one below:
class RepositoriesViewController: UIViewController { ... var rx_searchBarText: Observable<String> { return searchBar .rx_text .filter { $0.characters.count > 0 } // notice the filter new line .throttle(0.5, scheduler: MainScheduler.instance) .distinctUntilChanged() } ... }
And we will just add it as a variable to our `RepositoryViewController`. Now we would need to add connections between our new observable transformed into `Observable<[Repository]>` and pass back to `UITableView`. And we’ve done that before!
Step 2 – Network model and mapping objects
Right, Network model! But before that, we need a setup for mapping our objects. This time we will use different mapper, just so you know there are a lot of good options and you should always search for the one you like the most. `ObjectMapper` is also an awesome mapper and the configuration is kinda similar to the `Mapper` library. Let’s create new file called `Repository.swift` and implement the mapping setup for `Repository` object:
import ObjectMapper class Repository: Mappable { var identifier: Int! var language: String! var url: String! var name: String! required init?(_ map: Map) { } func mapping(map: Map) { identifier <- map["id"] language <- map["language"] url <- map["url"] name <- map["name"] } }
Perfect! That was the last part before the real action…
We have a controller, we have a model for `Repository` object… Yeah, time for our network model!
We will initialize the model with specific `Observable<String>` and implement method that returns `Observable<[Repository]>`. Then we can connect it to our view in `RepositoriesViewController`. The initial implementation of our `RepositoryNetworkModel.swift` could look like:
import ObjectMapper import RxAlamofire import RxCocoa import RxSwift struct RepositoryNetworkModel { private var repositoryName: Observable<String> private func fetchRepositories() -> Driver<[Repository]> { ... } }
Looks pretty casual at first. But if you look closely, we don’t actually return `Observable<[Repository]>`, but `Driver<[Repository]>` instead. What is this `Driver` guy and why you are lying to me all the time? ?
Well… We are talking about Schedulers today. And when you want to bind data to your UI, we always want to do it with usage of `MainScheduler`. And basically that is the role of a `Driver`. `Driver` is a `Variable` that says “Okay buddy, I’m gonna be on the main thread so don’t worry and bind me!”. This way we make sure that our binding won’t be error-prone and we can safely do the connection.
Okay, but what about implementation? Well, let’s start with `flatMapLatest()` that we used already before, and transform our `Observable<String>` to `Observable<[Repositories]>`:
struct RepositoryNetworkModel { ... private func fetchRepositories() -> Driver<[Repository]> { return repositoryName .flatMapLatest { text in return RxAlamofire .requestJSON(.GET, "https://api.github.com/users/\(text)/repos") .debug() .catchError { error in return Observable.never() } } .map { (response, json) -> [Repository] in if let repos = Mapper<Repository>().mapArray(json) { return repos } else { return [] } } } ... }
Hmm, first of all it seems different, there is another `map()` in addition to `flatMapLatest()`. But in reality it isn’t something you should be afraid of. See, in the `flatMapLatest()` we do the usual network request, then if there is any error we stop the pipeline with `Observable.never()`. Then we map our response from `Alamofire` to `Observable<[Repository]>`. We could chain the `map()` in the `flatMapLatest()` (after the `catchError()`) as well, but we will need it outside of the `flatMapLatest()` for later, so it is just the matter of preference.
Okay, this code above shouldn’t compile (because we return `Observable` and we expect to return `Driver`) so we need to go deeper. How do we transform `Observable<[Repository]>` to `Driver<[Repository]>`? Well, really simple. Any `Observable` can be transformed to `Driver` just by using `asDriver()` operation. In our case, we will use `.asDriver(onErrorJustReturn: [])` which basically means: If there is any error in the chain (but there probably isn’t, since we covered them before), return an empty array. And that’s it! Working code for now would be:
struct RepositoryNetworkModel { ... private func fetchRepositories() -> Driver<[Repository]> { return repositoryName .flatMapLatest { text in return RxAlamofire .requestJSON(.GET, "https://api.github.com/users/\(text)/repos") .debug() .catchError { error in return Observable.never() } } .map { (response, json) -> [Repository] in if let repos = Mapper<Repository>().mapArray(json) { return repos } else { return [] } } .asDriver(onErrorJustReturn: []) // This also makes sure that we are on MainScheduler } ... }
Perfect! See, we didn’t even touched `observeOn()` or `subscribeOn()`, but we already switched schedulers 2 times! First was with `throttle()` and now with `asDriver()` (which makes sure we are on `MainScheduler`) – and that is only a beginning. The code should work now and the last thing we have to do is to connect our repositories in our `RepositoryNetworkModel` to a view controller. But before that let’s replace the method with something else, because that way we create the new pipeline every time we use it. Instead, I would love a property. But not a computed property, because the result would be the same as with a method. Instead, we will create a `lazy var` that will be bound to our method that fetches repositories. This way we will avoid multiple creation of the sequence. Also we need to hide everything that isn’t the property, just to be sure that everyone that uses this model will get the correct `Driver` property! The only con of this solution is the fact that we have to explicitly type init in our struct, but I think this is a fair trade. So the current final model should look like the one below:
struct RepositoryNetworkModel { lazy var rx_repositories: Driver<[Repository]> = self.fetchRepositories() private var repositoryName: Observable<String> init(withNameObservable nameObservable: Observable<String>) { self.repositoryName = nameObservable } private func fetchRepositories() -> Driver<[Repository]> { return repositoryName .flatMapLatest { text in return RxAlamofire .requestJSON(.GET, "https://api.github.com/users/\(text)/repos") .debug() .catchError { error in return Observable.never() } } .map { (response, json) -> [Repository] in if let repos = Mapper<Repository>().mapArray(json) { return repos } else { return [] } } .asDriver(onErrorJustReturn: []) // This also makes sure that we are on MainScheduler } }
Great! Now we will just connect the data to view controller. When we wanna bind the`Driver` to our table view, instead of `bindTo` (that we used before) we will use `drive()` operation but the syntax and everything is just the same as with `bindTo`. In addition to binding our data to table view, we will also make another subscription and every time we have a count of repositories that equals 0, we will show an alert.
The final `RepositoriesViewController` class presents as follows:
class RepositoriesViewController: UIViewController { @IBOutlet weak var tableViewBottomConstraint: NSLayoutConstraint! @IBOutlet weak var tableView: UITableView! @IBOutlet weak var searchBar: UISearchBar! let disposeBag = DisposeBag() var repositoryNetworkModel: RepositoryNetworkModel! var rx_searchBarText: Observable<String> { return searchBar .rx_text .filter { $0.characters.count > 0 } .throttle(0.5, scheduler: MainScheduler.instance) .distinctUntilChanged() } override func viewDidLoad() { super.viewDidLoad() setupRx() } func setupRx() { repositoryNetworkModel = RepositoryNetworkModel(withNameObservable: rx_searchBarText) repositoryNetworkModel .rx_repositories .drive(tableView.rx_itemsWithCellFactory) { (tv, i, repository) in let cell = tv.dequeueReusableCellWithIdentifier("repositoryCell", forIndexPath: NSIndexPath(forRow: i, inSection: 0)) cell.textLabel?.text = repository.name return cell } .addDisposableTo(disposeBag) repositoryNetworkModel .rx_repositories .driveNext { repositories in if repositories.count == 0 { let alert = UIAlertController(title: ":(", message: "No repositories for this user.", preferredStyle: .Alert) alert.addAction(UIAlertAction(title: "OK", style: .Default, handler: nil)) if self.navigationController?.visibleViewController?.isMemberOfClass(UIAlertController.self) != true { self.presentViewController(alert, animated: true, completion: nil) } } } .addDisposableTo(disposeBag) } }
The only new thing in this piece of code is the `driveNext()` operation, but as you can guess it is just a `subscribeNext` for `Driver`.
And that’s it for Step 2! Onto the last one!
Step 3 – Multithreading optimization
So as you may guessed, in fact everything we did was done on a `MainScheduler`. Why? Because our chain starts from `searchBar.rx_text` and this one is guaranteed to be on `MainScheduler`. And because everything else is by default on current scheduler – well our UI thread may get overwhelmed. How to prevent that? Switch to the background thread before the request and before the mapping, so we will just update UI on the main thread:
struct RepositoryNetworkModel { ... private func fetchRepositories() -> Driver<[Repository]> { return repositoryName .observeOn(ConcurrentDispatchQueueScheduler(globalConcurrentQueueQOS: .Background)) .flatMapLatest { text in // .Background thread, network request return RxAlamofire .requestJSON(.GET, "https://api.github.com/users/\(text)/repos") .debug() .catchError { error in return Observable.never() } } .observeOn(ConcurrentDispatchQueueScheduler(globalConcurrentQueueQOS: .Background)) .map { (response, json) -> [Repository] in // again back to .Background, map objects if let repos = Mapper<Repository>().mapArray(json) { return repos } else { return [] } } .asDriver(onErrorJustReturn: []) // This also makes sure that we are on MainScheduler } ... }
But why do we use `observeOn()` two times in the same way? Because we don’t really know from the code above that `requestJSON` will return data on the same thread it started. So we’re gonna make sure it is again on the background thread for mapping, which is quite heavy.
Alright, now we have mapping on background threads, result delivered on UI thread, can we ask for more?! Of course we can! We want our user to know that some network request is going on. For that we will use the `UIApplication.sharedApplication().networkActivityIndicatorVisible` property, widely known as a spinner. But now we have to be careful with threads, since we want to update UI in the middle of a request/mapping operations. Also we will use a nice method called `doOn()` which can do whatever you want on specific events (like .Next, .Error etc.). Let’s say we want to show spinner before the `flatMapLatest()`: `doOn` is the one that can do that. We just need to switch to `MainScheduler` before the actions are performed.
So our full code for fetching repositories would be as follows:
struct RepositoryNetworkModel { lazy var rx_repositories: Driver<[Repository]> = self.fetchRepositories() private var repositoryName: Observable<String> init(withNameObservable nameObservable: Observable<String>) { self.repositoryName = nameObservable } private func fetchRepositories() -> Driver<[Repository]> { return repositoryName .subscribeOn(MainScheduler.instance) // Make sure we are on MainScheduler .doOn(onNext: { response in UIApplication.sharedApplication().networkActivityIndicatorVisible = true }) .observeOn(ConcurrentDispatchQueueScheduler(globalConcurrentQueueQOS: .Background)) .flatMapLatest { text in // .Background thread, network request return RxAlamofire .requestJSON(.GET, "https://api.github.com/users/\(text)/repos") .debug() .catchError { error in return Observable.never() } } .observeOn(ConcurrentDispatchQueueScheduler(globalConcurrentQueueQOS: .Background)) .map { (response, json) -> [Repository] in // again back to .Background, map objects if let repos = Mapper<Repository>().mapArray(json) { return repos } else { return [] } } .observeOn(MainScheduler.instance) // switch to MainScheduler, UI updates .doOn(onNext: { response in UIApplication.sharedApplication().networkActivityIndicatorVisible = false }) .asDriver(onErrorJustReturn: []) // This also makes sure that we are on MainScheduler } }
And that’s it! Now you know why we didn’t care about threads when parsing: the extension of `Moya-ModelMapper` switches schedulers for us.
Full app should be working as expected now:
This one was even longer than the last one so once again thank you for your patience, persistence and willingness to follow this not-so-easy example with me.
As always, please keep me updated with your feedback, ideas and improvements on twitter, or just comment here. Your messages always make my day so thank you for that as well. ✌️
Also I’m improving the resources for RxSwift all the time, so be sure to check them out. And `subscribe(?)` to get the latest info about our series or RxSwift in general.
You can find complete source code on Droids on Roids’s GitHub repository and here you can check other RxSwift examples!
Read more articles about RxSwift
RxSwift by Examples #1 – The Basics
RxSwift by Examples #2 – Observable and the Bind
RxSwift by Examples #3 – Networking
RxSwift by Examples #4 – Multithreading