python-rx python


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from rx import Observable, Observer

class (Observer):
def on_next(self, value):
print("Received {0}".format(value))

def on_completed(self):
print("Done!")

def on_error(self, error):
print("Error Occurred: {0}".format(error))

source = Observable.of("Alpha", "Beta", "Gamma", "Delta", "Epsilon")

source.subscribe(PrintObserver())
1
2
3
4
5
6
7
8
9
from rx import Observable

source = Observable.of("Alpha", "Beta", "Gamma", "Delta", "Epsilon")

source.subscribe(
on_next=lambda value: print("Received {0}".format(value)),
on_completed=lambda: print("Done!"),
on_error=lambda error: print("Error Occurred: {0}".format(error))
)
1
2
3
4
5
6
from rx import Observable

Observable.of("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
.map(lambda s: len(s))
.filter(lambda i: i >= 5)
.subscribe(lambda value: print("Received {0}".format(value)))
1
2
3
4
5
6
7
from rx import Observable

Observable.interval(1000)
.map(lambda i: "{0} Mississippi".format(i))
.subscribe(lambda s: print(s))

input("Press any key to quitn")
1
2
3
4
5
6
7
8
9
10
11
12
from rx import Observable
from random import randint


three_emissions = Observable.range(1, 3)

three_random_ints = three_emissions.map(lambda i: randint(1, 100000)).publish()

three_random_ints.subscribe(lambda i: print("Subscriber 1 Received: {0}".format(i)))
three_random_ints.subscribe(lambda i: print("Subscriber 2 Received: {0}".format(i)))

three_random_ints.connect()

第2个订阅者订阅后,事件才会触发。

1
2
3
4
5
6
7
8
9
10
from rx import Observable
from random import randint


three_emissions = Observable.range(1, 3)

three_random_ints = three_emissions.map(lambda i: randint(1, 100000)).publish().auto_connect(2)

three_random_ints.subscribe(lambda i: print("Subscriber 1 Received: {0}".format(i)))
three_random_ints.subscribe(lambda i: print("Subscriber 2 Received: {0}".format(i)))

1
2
3
4
5
6
7
8
9
10
from rx import Observable

letters = Observable.of("Alpha", "Beta", "Gamma", "Delta", "Epsilon")

intervals = Observable.interval(1000)

Observable.zip(letters, intervals, lambda s, i: (s, i))
.subscribe(lambda t: print(t))

input("Press any key to quitn")