Sharing subscription and shareReplay
operator
But what if you want that multiple observers share events (elements) from only one subscription?
There are two things that need to be defined.
- How to handle past elements that have been received before the new subscriber was interested in observing them (replay latest only, replay all, replay last n)
- How to decide when to fire that shared subscription (refCount, manual or some other algorithm)
The usual choice is a combination of replay(1).refCount()
aka shareReplay()
.
let counter = myInterval(0.1)
.shareReplay(1)
print("Started ----")
let subscription1 = counter
.subscribe(onNext: { n in
print("First \(n)")
})
let subscription2 = counter
.subscribe(onNext: { n in
print("Second \(n)")
})
Thread.sleep(forTimeInterval: 0.5)
subscription1.dispose()
Thread.sleep(forTimeInterval: 0.5)
subscription2.dispose()
print("Ended ----")
this will print
Started ----
Subscribed
First 0
Second 0
First 1
Second 1
First 2
Second 2
First 3
Second 3
First 4
Second 4
First 5
Second 5
Second 6
Second 7
Second 8
Second 9
Disposed
Ended ----
Notice how now there is only one Subscribed
and Disposed
event.
Behavior for URL observables is equivalent.
This is how HTTP requests are wrapped in Rx. It's pretty much the same pattern like the interval
operator.
extension Reactive where Base: URLSession {
public func response(_ request: URLRequest) -> Observable<(Data, HTTPURLResponse)> {
return Observable.create { observer in
let task = self.dataTaskWithRequest(request) { (data, response, error) in
guard let response = response, let data = data else {
observer.on(.error(error ?? RxCocoaURLError.Unknown))
return
}
guard let httpResponse = response as? HTTPURLResponse else {
observer.on(.error(RxCocoaURLError.nonHTTPResponse(response: response)))
return
}
observer.on(.next(data, httpResponse))
observer.on(.completed)
}
task.resume()
return Disposables.create {
task.cancel()
}
}
}
}