/** * Copyright 2015 Netflix, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package io.reactivesocket; import io.reactivesocket.internal.PublisherUtils; import org.reactivestreams.Publisher; import java.util.function.Function; public abstract class RequestHandler { public static final Function> NO_REQUEST_RESPONSE_HANDLER = payload -> PublisherUtils.errorPayload(new RuntimeException("No 'requestResponse' handler")); public static final Function> NO_REQUEST_STREAM_HANDLER = payload -> PublisherUtils.errorPayload(new RuntimeException("No 'requestStream' handler")); public static final Function> NO_REQUEST_SUBSCRIPTION_HANDLER = payload -> PublisherUtils.errorPayload(new RuntimeException("No 'requestSubscription' handler")); public static final Function> NO_FIRE_AND_FORGET_HANDLER = payload -> PublisherUtils.errorVoid(new RuntimeException("No 'fireAndForget' handler")); public static final Function, Publisher> NO_REQUEST_CHANNEL_HANDLER = payloads -> PublisherUtils.errorPayload(new RuntimeException("No 'requestChannel' handler")); public static final Function> NO_METADATA_PUSH_HANDLER = payload -> PublisherUtils.errorVoid(new RuntimeException("No 'metadataPush' handler")); public abstract Publisher handleRequestResponse(final Payload payload); public abstract Publisher handleRequestStream(final Payload payload); public abstract Publisher handleSubscription(final Payload payload); public abstract Publisher handleFireAndForget(final Payload payload); /** * @note The initialPayload will also be part of the inputs publisher. * It is there to simplify routing logic. */ public abstract Publisher handleChannel(Payload initialPayload, final Publisher inputs); public abstract Publisher handleMetadataPush(final Payload payload); public static class Builder { private Function> handleRequestResponse = NO_REQUEST_RESPONSE_HANDLER; private Function> handleRequestStream = NO_REQUEST_STREAM_HANDLER; private Function> handleRequestSubscription = NO_REQUEST_SUBSCRIPTION_HANDLER; private Function> handleFireAndForget = NO_FIRE_AND_FORGET_HANDLER; private Function, Publisher> handleRequestChannel = NO_REQUEST_CHANNEL_HANDLER; private Function> handleMetadataPush = NO_METADATA_PUSH_HANDLER; public Builder withRequestResponse(final Function> handleRequestResponse) { this.handleRequestResponse = handleRequestResponse; return this; } public Builder withRequestStream(final Function> handleRequestStream) { this.handleRequestStream = handleRequestStream; return this; } public Builder withRequestSubscription(final Function> handleRequestSubscription) { this.handleRequestSubscription = handleRequestSubscription; return this; } public Builder withFireAndForget(final Function> handleFireAndForget) { this.handleFireAndForget = handleFireAndForget; return this; } public Builder withRequestChannel(final Function , Publisher> handleRequestChannel) { this.handleRequestChannel = handleRequestChannel; return this; } public Builder withMetadataPush(final Function> handleMetadataPush) { this.handleMetadataPush = handleMetadataPush; return this; } public RequestHandler build() { return new RequestHandler() { public Publisher handleRequestResponse(Payload payload) { return handleRequestResponse.apply(payload); } public Publisher handleRequestStream(Payload payload) { return handleRequestStream.apply(payload); } public Publisher handleSubscription(Payload payload) { return handleRequestSubscription.apply(payload); } public Publisher handleFireAndForget(Payload payload) { return handleFireAndForget.apply(payload); } public Publisher handleChannel(Payload initialPayload, Publisher inputs) { return handleRequestChannel.apply(inputs); } public Publisher handleMetadataPush(Payload payload) { return handleMetadataPush.apply(payload); } }; } } }