Creating your own Observable (aka observable sequence)

There is one crucial thing to understand about observables.

When an observable is created, it doesn't perform any work simply because it has been created.

It is true that Observable can generate elements in many ways. Some of them cause side effects and some of them tap into existing running processes like tapping into mouse events, etc.

But if you just call a method that returns an Observable, no sequence generation is performed, and there are no side effects. Observable is just a definition how the sequence is generated and what parameters are used for element generation. Sequence generation starts when subscribe method is called.

E.g. Let's say you have a method with similar prototype:

func searchWikipedia(searchTerm: String) -> Observable<Results> {}
let searchForMe = searchWikipedia("me")

// no requests are performed, no work is being done, no URL requests were fired

let cancel = searchForMe
  // sequence generation starts now, URL requests are fired
  .subscribe(onNext: { results in
      print(results)
  })

There are a lot of ways how you can create your own Observable sequence. Probably the easiest way is using create function.

Let's create a function which creates a sequence that returns one element upon subscription. That function is called 'just'.

This is the actual implementation

func myJust<E>(element: E) -> Observable<E> {
    return Observable.create { observer in
        observer.on(.next(element))
        observer.on(.completed)
        return Disposables.create()
    }
}

myJust(0)
    .subscribe(onNext: { n in
      print(n)
    })

this will print:

0

Not bad. So what is the create function?

It's just a convenience method that enables you to easily implement subscribe method using Swift closures. Like subscribe method it takes one argument, observer, and returns disposable.

Sequence implemented this way is actually synchronous. It will generate elements and terminate before subscribe call returns disposable representing subscription. Because of that it doesn't really matter what disposable it returns, process of generating elements can't be interrupted.

When generating synchronous sequences, the usual disposable to return is singleton instance of NopDisposable.

Lets now create an observable that returns elements from an array.

This is the actual implementation

func myFrom<E>(sequence: [E]) -> Observable<E> {
    return Observable.create { observer in
        for element in sequence {
            observer.on(.next(element))
        }

        observer.on(.completed)
        return Disposables.create()
    }
}

let stringCounter = myFrom(["first", "second"])

print("Started ----")

// first time
stringCounter
    .subscribe(onNext: { n in
        print(n)
    })

print("----")

// again
stringCounter
    .subscribe(onNext: { n in
        print(n)
    })

print("Ended ----")

This will print:

Started ----
first
second
----
first
second
Ended ----

results matching ""

    No results matching ""