×

Welcome to TagMyCode

Please login or create account to add a snippet.
0
0
 
0
Language: Java
Posted by: userb0319
Added: Nov 29, 2017 4:40 AM
Views: 6
Tags: 123
????
  1. /*
  2.  * Copyright 2012 The Netty Project
  3.  *
  4.  * The Netty Project licenses this file to you under the Apache License,
  5.  * version 2.0 (the "License"); you may not use this file except in compliance
  6.  * with the License. You may obtain a copy of the License at:
  7.  *
  8.  *   http://www.apache.org/licenses/LICENSE-2.0
  9.  *
  10.  * Unless required by applicable law or agreed to in writing, software
  11.  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  12.  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  13.  * License for the specific language governing permissions and limitations
  14.  * under the License.
  15.  */
  16.  
  17. package io.netty.bootstrap;
  18.  
  19. import io.netty.channel.Channel;
  20. import io.netty.channel.ChannelFuture;
  21. import io.netty.channel.ChannelFutureListener;
  22. import io.netty.channel.ChannelHandler;
  23. import io.netty.channel.ChannelOption;
  24. import io.netty.channel.ChannelPromise;
  25. import io.netty.channel.DefaultChannelPromise;
  26. import io.netty.channel.EventLoop;
  27. import io.netty.channel.EventLoopGroup;
  28. import io.netty.channel.ReflectiveChannelFactory;
  29. import io.netty.util.internal.SocketUtils;
  30. import io.netty.util.AttributeKey;
  31. import io.netty.util.concurrent.EventExecutor;
  32. import io.netty.util.concurrent.GlobalEventExecutor;
  33. import io.netty.util.internal.StringUtil;
  34. import io.netty.util.internal.logging.InternalLogger;
  35.  
  36. import java.net.InetAddress;
  37. import java.net.InetSocketAddress;
  38. import java.net.SocketAddress;
  39. import java.util.Collections;
  40. import java.util.LinkedHashMap;
  41. import java.util.Map;
  42.  
  43. /**
  44.  * {@link AbstractBootstrap} is a helper class that makes it easy to bootstrap a {@link Channel}. It support
  45.  * method-chaining to provide an easy way to configure the {@link AbstractBootstrap}.
  46.  *
  47.  * <p>When not used in a {@link ServerBootstrap} context, the {@link #bind()} methods are useful for connectionless
  48.  * transports such as datagram (UDP).</p>
  49.  */
  50. public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
  51.  
  52.     volatile EventLoopGroup group;
  53.     @SuppressWarnings("deprecation")
  54.     private volatile ChannelFactory<? extends C> channelFactory;
  55.     private volatile SocketAddress localAddress;
  56.     private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();
  57.     private final Map<AttributeKey<?>, Object> attrs = new LinkedHashMap<AttributeKey<?>, Object>();
  58.     private volatile ChannelHandler handler;
  59.  
  60.     AbstractBootstrap() {
  61.         // Disallow extending from a different package.
  62.     }
  63.  
  64.     AbstractBootstrap(AbstractBootstrap<B, C> bootstrap) {
  65.         group = bootstrap.group;
  66.         channelFactory = bootstrap.channelFactory;
  67.         handler = bootstrap.handler;
  68.         localAddress = bootstrap.localAddress;
  69.         synchronized (bootstrap.options) {
  70.             options.putAll(bootstrap.options);
  71.         }
  72.         synchronized (bootstrap.attrs) {
  73.             attrs.putAll(bootstrap.attrs);
  74.         }
  75.     }
  76.  
  77.     /**
  78.      * The {@link EventLoopGroup} which is used to handle all the events for the to-be-created
  79.      * {@link Channel}
  80.      */
  81.     public B group(EventLoopGroup group) {
  82.         if (group == null) {
  83.             throw new NullPointerException("group");
  84.         }
  85.         if (this.group != null) {
  86.             throw new IllegalStateException("group set already");
  87.         }
  88.         this.group = group;
  89.         return self();
  90.     }
  91.  
  92.     @SuppressWarnings("unchecked")
  93.     private B self() {
  94.         return (B) this;
  95.     }
  96.  
  97.     /**
  98.      * The {@link Class} which is used to create {@link Channel} instances from.
  99.      * You either use this or {@link #channelFactory(io.netty.channel.ChannelFactory)} if your
  100.      * {@link Channel} implementation has no no-args constructor.
  101.      */
  102.     public B channel(Class<? extends C> channelClass) {
  103.         if (channelClass == null) {
  104.             throw new NullPointerException("channelClass");
  105.         }
  106.         return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
  107.     }
  108.  
  109.     /**
  110.      * @deprecated Use {@link #channelFactory(io.netty.channel.ChannelFactory)} instead.
  111.      */
  112.     @Deprecated
  113.     public B channelFactory(ChannelFactory<? extends C> channelFactory) {
  114.         if (channelFactory == null) {
  115.             throw new NullPointerException("channelFactory");
  116.         }
  117.         if (this.channelFactory != null) {
  118.             throw new IllegalStateException("channelFactory set already");
  119.         }
  120.  
  121.         this.channelFactory = channelFactory;
  122.         return self();
  123.     }
  124.  
  125.     /**
  126.      * {@link io.netty.channel.ChannelFactory} which is used to create {@link Channel} instances from
  127.      * when calling {@link #bind()}. This method is usually only used if {@link #channel(Class)}
  128.      * is not working for you because of some more complex needs. If your {@link Channel} implementation
  129.      * has a no-args constructor, its highly recommend to just use {@link #channel(Class)} for
  130.      * simplify your code.
  131.      */
  132.     @SuppressWarnings({ "unchecked", "deprecation" })
  133.     public B channelFactory(io.netty.channel.ChannelFactory<? extends C> channelFactory) {
  134.         return channelFactory((ChannelFactory<C>) channelFactory);
  135.     }
  136.  
  137.     /**
  138.      * The {@link SocketAddress} which is used to bind the local "end" to.
  139.      */
  140.     public B localAddress(SocketAddress localAddress) {
  141.         this.localAddress = localAddress;
  142.         return self();
  143.     }
  144.  
  145.     /**
  146.      * @see #localAddress(SocketAddress)
  147.      */
  148.     public B localAddress(int inetPort) {
  149.         return localAddress(new InetSocketAddress(inetPort));
  150.     }
  151.  
  152.     /**
  153.      * @see #localAddress(SocketAddress)
  154.      */
  155.     public B localAddress(String inetHost, int inetPort) {
  156.         return localAddress(SocketUtils.socketAddress(inetHost, inetPort));
  157.     }
  158.  
  159.     /**
  160.      * @see #localAddress(SocketAddress)
  161.      */
  162.     public B localAddress(InetAddress inetHost, int inetPort) {
  163.         return localAddress(new InetSocketAddress(inetHost, inetPort));
  164.     }
  165.  
  166.     /**
  167.      * Allow to specify a {@link ChannelOption} which is used for the {@link Channel} instances once they got
  168.      * created. Use a value of {@code null} to remove a previous set {@link ChannelOption}.
  169.      */
  170.     public <T> B option(ChannelOption<T> option, T value) {
  171.         if (option == null) {
  172.             throw new NullPointerException("option");
  173.         }
  174.         if (value == null) {
  175.             synchronized (options) {
  176.                 options.remove(option);
  177.             }
  178.         } else {
  179.             synchronized (options) {
  180.                 options.put(option, value);
  181.             }
  182.         }
  183.         return self();
  184.     }
  185.  
  186.     /**
  187.      * Allow to specify an initial attribute of the newly created {@link Channel}.  If the {@code value} is
  188.      * {@code null}, the attribute of the specified {@code key} is removed.
  189.      */
  190.     public <T> B attr(AttributeKey<T> key, T value) {
  191.         if (key == null) {
  192.             throw new NullPointerException("key");
  193.         }
  194.         if (value == null) {
  195.             synchronized (attrs) {
  196.                 attrs.remove(key);
  197.             }
  198.         } else {
  199.             synchronized (attrs) {
  200.                 attrs.put(key, value);
  201.             }
  202.         }
  203.         return self();
  204.     }
  205.  
  206.     /**
  207.      * Validate all the parameters. Sub-classes may override this, but should
  208.      * call the super method in that case.
  209.      */
  210.     public B validate() {
  211.         if (group == null) {
  212.             throw new IllegalStateException("group not set");
  213.         }
  214.         if (channelFactory == null) {
  215.             throw new IllegalStateException("channel or channelFactory not set");
  216.         }
  217.         return self();
  218.     }
  219.  
  220.     /**
  221.      * Returns a deep clone of this bootstrap which has the identical configuration.  This method is useful when making
  222.      * multiple {@link Channel}s with similar settings.  Please note that this method does not clone the
  223.      * {@link EventLoopGroup} deeply but shallowly, making the group a shared resource.
  224.      */
  225.     @Override
  226.     @SuppressWarnings("CloneDoesntDeclareCloneNotSupportedException")
  227.     public abstract B clone();
  228.  
  229.     /**
  230.      * Create a new {@link Channel} and register it with an {@link EventLoop}.
  231.      */
  232.     public ChannelFuture register() {
  233.         validate();
  234.         return initAndRegister();
  235.     }
  236.  
  237.     /**
  238.      * Create a new {@link Channel} and bind it.
  239.      */
  240.     public ChannelFuture bind() {
  241.         validate();
  242.         SocketAddress localAddress = this.localAddress;
  243.         if (localAddress == null) {
  244.             throw new IllegalStateException("localAddress not set");
  245.         }
  246.         return doBind(localAddress);
  247.     }
  248.  
  249.     /**
  250.      * Create a new {@link Channel} and bind it.
  251.      */
  252.     public ChannelFuture bind(int inetPort) {
  253.         return bind(new InetSocketAddress(inetPort));
  254.     }
  255.  
  256.     /**
  257.      * Create a new {@link Channel} and bind it.
  258.      */
  259.     public ChannelFuture bind(String inetHost, int inetPort) {
  260.         return bind(SocketUtils.socketAddress(inetHost, inetPort));
  261.     }
  262.  
  263.     /**
  264.      * Create a new {@link Channel} and bind it.
  265.      */
  266.     public ChannelFuture bind(InetAddress inetHost, int inetPort) {
  267.         return bind(new InetSocketAddress(inetHost, inetPort));
  268.     }
  269.  
  270.     /**
  271.      * Create a new {@link Channel} and bind it.
  272.      */
  273.     public ChannelFuture bind(SocketAddress localAddress) {
  274.         validate();
  275.         if (localAddress == null) {
  276.             throw new NullPointerException("localAddress");
  277.         }
  278.         return doBind(localAddress);
  279.     }
  280.  
  281.     private ChannelFuture doBind(final SocketAddress localAddress) {
  282.         final ChannelFuture regFuture = initAndRegister();
  283.         final Channel channel = regFuture.channel();
  284.         if (regFuture.cause() != null) {
  285.             return regFuture;
  286.         }
  287.  
  288.         if (regFuture.isDone()) {
  289.             // At this point we know that the registration was complete and successful.
  290.             ChannelPromise promise = channel.newPromise();
  291.             doBind0(regFuture, channel, localAddress, promise);
  292.             return promise;
  293.         } else {
  294.             // Registration future is almost always fulfilled already, but just in case it's not.
  295.             final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
  296.             regFuture.addListener(new ChannelFutureListener() {
  297.                 @Override
  298.                 public void operationComplete(ChannelFuture future) throws Exception {
  299.                     Throwable cause = future.cause();
  300.                     if (cause != null) {
  301.                         // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
  302.                         // IllegalStateException once we try to access the EventLoop of the Channel.
  303.                         promise.setFailure(cause);
  304.                     } else {
  305.                         // Registration was successful, so set the correct executor to use.
  306.                         // See https://github.com/netty/netty/issues/2586
  307.                         promise.registered();
  308.  
  309.                         doBind0(regFuture, channel, localAddress, promise);
  310.                     }
  311.                 }
  312.             });
  313.             return promise;
  314.         }
  315.     }
  316.  
  317.     final ChannelFuture initAndRegister() {
  318.         Channel channel = null;
  319.         try {
  320.             channel = channelFactory.newChannel();
  321.             init(channel);
  322.         } catch (Throwable t) {
  323.             if (channel != null) {
  324.                 // channel can be null if newChannel crashed (eg SocketException("too many open files"))
  325.                 channel.unsafe().closeForcibly();
  326.             }
  327.             // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
  328.             return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
  329.         }
  330.  
  331.         ChannelFuture regFuture = config().group().register(channel);
  332.         if (regFuture.cause() != null) {
  333.             if (channel.isRegistered()) {
  334.                 channel.close();
  335.             } else {
  336.                 channel.unsafe().closeForcibly();
  337.             }
  338.         }
  339.  
  340.         // If we are here and the promise is not failed, it's one of the following cases:
  341.         // 1) If we attempted registration from the event loop, the registration has been completed at this point.
  342.         //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
  343.         // 2) If we attempted registration from the other thread, the registration request has been successfully
  344.         //    added to the event loop's task queue for later execution.
  345.         //    i.e. It's safe to attempt bind() or connect() now:
  346.         //         because bind() or connect() will be executed *after* the scheduled registration task is executed
  347.         //         because register(), bind(), and connect() are all bound to the same thread.
  348.  
  349.         return regFuture;
  350.     }
  351.  
  352.     abstract void init(Channel channel) throws Exception;
  353.  
  354.     private static void doBind0(
  355.             final ChannelFuture regFuture, final Channel channel,
  356.             final SocketAddress localAddress, final ChannelPromise promise) {
  357.  
  358.         // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
  359.         // the pipeline in its channelRegistered() implementation.
  360.         channel.eventLoop().execute(new Runnable() {
  361.             @Override
  362.             public void run() {
  363.                 if (regFuture.isSuccess()) {
  364.                     channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
  365.                 } else {
  366.                     promise.setFailure(regFuture.cause());
  367.                 }
  368.             }
  369.         });
  370.     }
  371.  
  372.     /**
  373.      * the {@link ChannelHandler} to use for serving the requests.
  374.      */
  375.     public B handler(ChannelHandler handler) {
  376.         if (handler == null) {
  377.             throw new NullPointerException("handler");
  378.         }
  379.         this.handler = handler;
  380.         return self();
  381.     }
  382.  
  383.     /**
  384.      * Returns the configured {@link EventLoopGroup} or {@code null} if non is configured yet.
  385.      *
  386.      * @deprecated Use {@link #config()} instead.
  387.      */
  388.     @Deprecated
  389.     public final EventLoopGroup group() {
  390.         return group;
  391.     }
  392.  
  393.     /**
  394.      * Returns the {@link AbstractBootstrapConfig} object that can be used to obtain the current config
  395.      * of the bootstrap.
  396.      */
  397.     public abstract AbstractBootstrapConfig<B, C> config();
  398.  
  399.     static <K, V> Map<K, V> copiedMap(Map<K, V> map) {
  400.         final Map<K, V> copied;
  401.         synchronized (map) {
  402.             if (map.isEmpty()) {
  403.                 return Collections.emptyMap();
  404.             }
  405.             copied = new LinkedHashMap<K, V>(map);
  406.         }
  407.         return Collections.unmodifiableMap(copied);
  408.     }
  409.  
  410.     final Map<ChannelOption<?>, Object> options0() {
  411.         return options;
  412.     }
  413.  
  414.     final Map<AttributeKey<?>, Object> attrs0() {
  415.         return attrs;
  416.     }
  417.  
  418.     final SocketAddress localAddress() {
  419.         return localAddress;
  420.     }
  421.  
  422.     @SuppressWarnings("deprecation")
  423.     final ChannelFactory<? extends C> channelFactory() {
  424.         return channelFactory;
  425.     }
  426.  
  427.     final ChannelHandler handler() {
  428.         return handler;
  429.     }
  430.  
  431.     final Map<ChannelOption<?>, Object> options() {
  432.         return copiedMap(options);
  433.     }
  434.  
  435.     final Map<AttributeKey<?>, Object> attrs() {
  436.         return copiedMap(attrs);
  437.     }
  438.  
  439.     static void setChannelOptions(
  440.             Channel channel, Map<ChannelOption<?>, Object> options, InternalLogger logger) {
  441.         for (Map.Entry<ChannelOption<?>, Object> e: options.entrySet()) {
  442.             setChannelOption(channel, e.getKey(), e.getValue(), logger);
  443.         }
  444.     }
  445.  
  446.     static void setChannelOptions(
  447.             Channel channel, Map.Entry<ChannelOption<?>, Object>[] options, InternalLogger logger) {
  448.         for (Map.Entry<ChannelOption<?>, Object> e: options) {
  449.             setChannelOption(channel, e.getKey(), e.getValue(), logger);
  450.         }
  451.     }
  452.  
  453.     @SuppressWarnings("unchecked")
  454.     private static void setChannelOption(
  455.             Channel channel, ChannelOption<?> option, Object value, InternalLogger logger) {
  456.         try {
  457.             if (!channel.config().setOption((ChannelOption<Object>) option, value)) {
  458.                 logger.warn("Unknown channel option '{}' for channel '{}'", option, channel);
  459.             }
  460.         } catch (Throwable t) {
  461.             logger.warn(
  462.                     "Failed to set channel option '{}' with value '{}' for channel '{}'", option, value, channel, t);
  463.         }
  464.     }
  465.  
  466.     @Override
  467.     public String toString() {
  468.         StringBuilder buf = new StringBuilder()
  469.             .append(StringUtil.simpleClassName(this))
  470.             .append('(').append(config()).append(')');
  471.         return buf.toString();
  472.     }
  473.  
  474.     static final class PendingRegistrationPromise extends DefaultChannelPromise {
  475.  
  476.         // Is set to the correct EventExecutor once the registration was successful. Otherwise it will
  477.         // stay null and so the GlobalEventExecutor.INSTANCE will be used for notifications.
  478.         private volatile boolean registered;
  479.  
  480.         PendingRegistrationPromise(Channel channel) {
  481.             super(channel);
  482.         }
  483.  
  484.         void registered() {
  485.             registered = true;
  486.         }
  487.  
  488.         @Override
  489.         protected EventExecutor executor() {
  490.             if (registered) {
  491.                 // If the registration was a success executor is set.
  492.                 //
  493.                 // See https://github.com/netty/netty/issues/2586
  494.                 return super.executor();
  495.             }
  496.             // The registration failed so we can only use the GlobalEventExecutor as last resort to notify.
  497.             return GlobalEventExecutor.INSTANCE;
  498.         }
  499.     }
  500. }
  501.