1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 module thrift.async.socket;
20 
21 import core.thread : Fiber;
22 import core.time : dur, Duration;
23 import std.array : empty;
24 import std.conv : to;
25 import std.exception : enforce;
26 import std.socket;
27 import thrift.base;
28 import thrift.async.base;
29 import thrift.transport.base;
30 import thrift.transport.socket : TSocketBase;
31 import thrift.internal.endian;
32 import thrift.internal.socket;
33 
34 version (Windows) {
35   import std.c.windows.winsock : connect;
36 } else version (Posix) {
37   import core.sys.posix.sys.socket : connect;
38 } else static assert(0, "Don't know connect on this platform.");
39 
40 /**
41  * Non-blocking socket implementation of the TTransport interface.
42  *
43  * Whenever a socket operation would block, TAsyncSocket registers a callback
44  * with the specified TAsyncSocketManager and yields.
45  *
46  * As for thrift.transport.socket, due to the limitations of std.socket,
47  * currently only TCP/IP sockets are supported (i.e. Unix domain sockets are
48  * not).
49  */
50 class TAsyncSocket : TSocketBase, TAsyncTransport {
51   /**
52    * Constructor that takes an already created, connected (!) socket.
53    *
54    * Params:
55    *   asyncManager = The TAsyncSocketManager to use for non-blocking I/O.
56    *   socket = Already created, connected socket object. Will be switched to
57    *     non-blocking mode if it isn't already.
58    */
59   this(TAsyncSocketManager asyncManager, Socket socket) {
60     asyncManager_ = asyncManager;
61     socket.blocking = false;
62     super(socket);
63   }
64 
65   /**
66    * Creates a new unconnected socket that will connect to the given host
67    * on the given port.
68    *
69    * Params:
70    *   asyncManager = The TAsyncSocketManager to use for non-blocking I/O.
71    *   host = Remote host.
72    *   port = Remote port.
73    */
74   this(TAsyncSocketManager asyncManager, string host, ushort port) {
75     asyncManager_ = asyncManager;
76     super(host, port);
77   }
78 
79   override TAsyncManager asyncManager() @property {
80     return asyncManager_;
81   }
82 
83   /**
84    * Asynchronously connects the socket.
85    *
86    * Completes without blocking and defers further operations on the socket
87    * until the connection is established. If connecting fails, this is
88    * currently not indicated in any way other than every call to read/write
89    * failing.
90    */
91   override void open() {
92     if (isOpen) return;
93 
94     enforce(!host_.empty, new TTransportException(
95       "Cannot open null host.", TTransportException.Type.NOT_OPEN));
96     enforce(port_ != 0, new TTransportException(
97       "Cannot open with null port.", TTransportException.Type.NOT_OPEN));
98 
99 
100     // Cannot use std.socket.Socket.connect here because it hides away
101     // EINPROGRESS/WSAWOULDBLOCK.
102     Address addr;
103     try {
104       // Currently, we just go with the first address returned, could be made
105       // more intelligent though – IPv6?
106       addr = getAddress(host_, port_)[0];
107     } catch (Exception e) {
108       throw new TTransportException(`Unable to resolve host "` ~ host_ ~ `".`,
109         TTransportException.Type.NOT_OPEN, __FILE__, __LINE__, e);
110     }
111 
112     socket_ = new TcpSocket(addr.addressFamily);
113     socket_.blocking = false;
114     setSocketOpts();
115 
116     auto errorCode = connect(socket_.handle, addr.name(), addr.nameLen());
117     if (errorCode == 0) {
118       // If the connection could be established immediately, just return. I
119       // don't know if this ever happens.
120       return;
121     }
122 
123     auto errno = getSocketErrno();
124     if (errno != CONNECT_INPROGRESS_ERRNO) {
125       throw new TTransportException(`Could not establish connection to "` ~
126         host_ ~ `": ` ~ socketErrnoString(errno),
127         TTransportException.Type.NOT_OPEN);
128     }
129 
130     // This is the expected case: connect() signalled that the connection
131     // is being established in the background. Queue up a work item with the
132     // async manager which just defers any other operations on this
133     // TAsyncSocket instance until the socket is ready.
134     asyncManager_.execute(this,
135       {
136         auto fiber = Fiber.getThis();
137         TAsyncEventReason reason = void;
138         asyncManager_.addOneshotListener(socket_, TAsyncEventType.WRITE,
139           connectTimeout,
140           scopedDelegate((TAsyncEventReason r){ reason = r; fiber.call(); })
141         );
142         Fiber.yield();
143 
144         if (reason == TAsyncEventReason.TIMED_OUT) {
145           // Close the connection, so that subsequent work items fail immediately.
146           closeImmediately();
147           return;
148         }
149 
150         int errorCode = void;
151         socket_.getOption(SocketOptionLevel.SOCKET, cast(SocketOption)SO_ERROR,
152           errorCode);
153 
154         if (errorCode) {
155           logInfo("Could not connect TAsyncSocket: %s",
156             socketErrnoString(errorCode));
157 
158           // Close the connection, so that subsequent work items fail immediately.
159           closeImmediately();
160           return;
161         }
162 
163       }
164     );
165   }
166 
167   /**
168    * Closes the socket.
169    *
170    * Will block until all currently active operations are finished before the
171    * socket is closed.
172    */
173   override void close() {
174     if (!isOpen) return;
175 
176     import core.sync.condition;
177     import core.sync.mutex;
178 
179     auto doneMutex = new Mutex;
180     auto doneCond = new Condition(doneMutex);
181     synchronized (doneMutex) {
182       asyncManager_.execute(this,
183         scopedDelegate(
184           {
185             closeImmediately();
186             synchronized (doneMutex) doneCond.notifyAll();
187           }
188         )
189       );
190       doneCond.wait();
191     }
192   }
193 
194   override bool peek() {
195     if (!isOpen) return false;
196 
197     ubyte buf;
198     auto r = socket_.receive((&buf)[0..1], SocketFlags.PEEK);
199     if (r == Socket.ERROR) {
200       auto lastErrno = getSocketErrno();
201       static if (connresetOnPeerShutdown) {
202         if (lastErrno == ECONNRESET) {
203           closeImmediately();
204           return false;
205         }
206       }
207       throw new TTransportException("Peeking into socket failed: " ~
208         socketErrnoString(lastErrno), TTransportException.Type.UNKNOWN);
209     }
210     return (r > 0);
211   }
212 
213   override size_t read(ubyte[] buf) {
214     enforce(isOpen, new TTransportException(
215       "Cannot read if socket is not open.", TTransportException.Type.NOT_OPEN));
216 
217     typeof(getSocketErrno()) lastErrno;
218 
219     auto r = yieldOnBlock(socket_.receive(cast(void[])buf),
220       TAsyncEventType.READ);
221 
222     // If recv went fine, immediately return.
223     if (r >= 0) return r;
224 
225     // Something went wrong, find out how to handle it.
226     lastErrno = getSocketErrno();
227 
228     static if (connresetOnPeerShutdown) {
229       // See top comment.
230       if (lastErrno == ECONNRESET) {
231         return 0;
232       }
233     }
234 
235     throw new TTransportException("Receiving from socket failed: " ~
236       socketErrnoString(lastErrno), TTransportException.Type.UNKNOWN);
237   }
238 
239   override void write(in ubyte[] buf) {
240     size_t sent;
241     while (sent < buf.length) {
242       sent += writeSome(buf[sent .. $]);
243     }
244     assert(sent == buf.length);
245   }
246 
247   override size_t writeSome(in ubyte[] buf) {
248     enforce(isOpen, new TTransportException(
249       "Cannot write if socket is not open.", TTransportException.Type.NOT_OPEN));
250 
251     auto r = yieldOnBlock(socket_.send(buf), TAsyncEventType.WRITE);
252 
253     // Everything went well, just return the number of bytes written.
254     if (r > 0) return r;
255 
256     // Handle error conditions.
257     if (r < 0) {
258       auto lastErrno = getSocketErrno();
259 
260       auto type = TTransportException.Type.UNKNOWN;
261       if (isSocketCloseErrno(lastErrno)) {
262         type = TTransportException.Type.NOT_OPEN;
263         closeImmediately();
264       }
265 
266       throw new TTransportException("Sending to socket failed: " ~
267         socketErrnoString(lastErrno), type);
268     }
269 
270     // send() should never return 0.
271     throw new TTransportException("Sending to socket failed (0 bytes written).",
272       TTransportException.Type.UNKNOWN);
273   }
274 
275   /// The amount of time in which a conncetion must be established before the
276   /// open() call times out.
277   Duration connectTimeout = dur!"seconds"(5);
278 
279 private:
280   void closeImmediately() {
281     socket_.close();
282     socket_ = null;
283   }
284 
285   T yieldOnBlock(T)(lazy T call, TAsyncEventType eventType) {
286     while (true) {
287       auto result = call();
288       if (result != Socket.ERROR || getSocketErrno() != WOULD_BLOCK_ERRNO) return result;
289 
290       // We got an EAGAIN result, register a callback to return here once some
291       // event happens and yield.
292 
293       Duration timeout = void;
294       final switch (eventType) {
295         case TAsyncEventType.READ:
296           timeout = recvTimeout_;
297           break;
298         case TAsyncEventType.WRITE:
299           timeout = sendTimeout_;
300           break;
301       }
302 
303       auto fiber = Fiber.getThis();
304       assert(fiber, "Current fiber null – not running in TAsyncManager?");
305       TAsyncEventReason eventReason = void;
306       asyncManager_.addOneshotListener(socket_, eventType, timeout,
307         scopedDelegate((TAsyncEventReason reason) {
308           eventReason = reason;
309           fiber.call();
310         })
311       );
312 
313       // Yields execution back to the async manager, will return back here once
314       // the above listener is called.
315       Fiber.yield();
316 
317       if (eventReason == TAsyncEventReason.TIMED_OUT) {
318         // If we are cancelling the request due to a timed out operation, the
319         // connection is in an undefined state, because the server could decide
320         // to send the requested data later, or we could have already been half-
321         // way into writing a request. Thus, we close the connection to make any
322         // possibly queued up work items fail immediately. Besides, the server
323         // is not very likely to immediately recover after a socket-level
324         // timeout has expired anyway.
325         closeImmediately();
326 
327         throw new TTransportException("Timed out while waiting for socket " ~
328           "to get ready to " ~ to!string(eventType) ~ ".",
329           TTransportException.Type.TIMED_OUT);
330       }
331     }
332   }
333 
334   /// The TAsyncSocketManager to use for non-blocking I/O.
335   TAsyncSocketManager asyncManager_;
336 }
337 
338 private {
339   // std.socket doesn't include SO_ERROR for reasons unknown.
340   version (linux) {
341     enum SO_ERROR = 4;
342   } else version (OSX) {
343     enum SO_ERROR = 0x1007;
344   } else version (FreeBSD) {
345     enum SO_ERROR = 0x1007;
346   } else version (Win32) {
347     import std.c.windows.winsock : SO_ERROR;
348   } else static assert(false, "Don't know SO_ERROR on this platform.");
349 
350   // This hack forces a delegate literal to be scoped, even if it is passed to
351   // a function accepting normal delegates as well. DMD likes to allocate the
352   // context on the heap anyway, but it seems to work for LDC.
353   import std.traits : isDelegate;
354   auto scopedDelegate(D)(scope D d) if (isDelegate!D) {
355     return d;
356   }
357 }