forked from rsocket/rsocket-java
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathTestConnection.java
More file actions
109 lines (90 loc) · 3.31 KB
/
TestConnection.java
File metadata and controls
109 lines (90 loc) · 3.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
/**
* Copyright 2015 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.reactivesocket;
import static io.reactivex.Observable.*;
import java.io.IOException;
import org.reactivestreams.Publisher;
import io.reactivesocket.rx.Completable;
import io.reactivesocket.rx.Observer;
import io.reactivex.Observable;
import io.reactivex.Scheduler.Worker;
import io.reactivex.schedulers.Schedulers;
public class TestConnection implements DuplexConnection {
public final SerializedEventBus toInput = new SerializedEventBus();
public final SerializedEventBus write = new SerializedEventBus();
@Override
public void addOutput(Publisher<Frame> o, Completable callback) {
fromPublisher(o).flatMap(m -> {
// no backpressure on a Subject so just firehosing for this test
write.send(m);
return Observable.<Void> empty();
}).subscribe(v -> {
} , callback::error, callback::success);
}
@Override
public void addOutput(Frame f, Completable callback) {
write.send(f);
callback.success();
}
@Override
public io.reactivesocket.rx.Observable<Frame> getInput() {
return new io.reactivesocket.rx.Observable<Frame>() {
@Override
public void subscribe(Observer<Frame> o) {
toInput.add(o);
// we are okay with the race of sending data and cancelling ... since this is "hot" by definition and unsubscribing is a race.
o.onSubscribe(new io.reactivesocket.rx.Disposable() {
@Override
public void dispose() {
toInput.remove(o);
}
});
}
};
}
public void connectToServerConnection(TestConnection serverConnection) {
connectToServerConnection(serverConnection, true);
}
Worker clientThread = Schedulers.newThread().createWorker();
Worker serverThread = Schedulers.newThread().createWorker();
public void connectToServerConnection(TestConnection serverConnection, boolean log) {
if (log) {
serverConnection.write.add(n -> System.out.println("SERVER ==> Writes from server->client: " + n + " Written from " + Thread.currentThread()));
serverConnection.toInput.add(n -> System.out.println("SERVER <== Input from client->server: " + n + " Read on " + Thread.currentThread()));
write.add(n -> System.out.println("CLIENT ==> Writes from client->server: " + n + " Written from " + Thread.currentThread()));
toInput.add(n -> System.out.println("CLIENT <== Input from server->client: " + n + " Read on " + Thread.currentThread()));
}
// client to server
write.add(f -> {
// serverConnection.toInput.send(f);
serverThread.schedule(() -> {
serverConnection.toInput.send(f);
});
});
// server to client
serverConnection.write.add(f -> {
// toInput.send(f);
clientThread.schedule(() -> {
toInput.send(f);
});
});
}
@Override
public void close() throws IOException {
clientThread.dispose();
serverThread.dispose();
}
}