import rx import threading import concurrent.futures def prep(i): print('>', 'prep', threading.get_ident()) return i def calc(i): print('#', 'calc', threading.get_ident()) return i ** 2 def done(i): print('!', 'done', threading.get_ident()) executor = concurrent.futures.ThreadPoolExecutor(2) source = rx.subjects.Subject() ( source .map(prep) .select_many(lambda i: executor.submit(calc, i)) .subscribe(done) ) source.on_next(13) source.on_next(42) source.on_next(50) source.on_next(60) source.on_next(80)