1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19 module thrift.async.socket; 20 21 import core.thread : Fiber; 22 import core.time : dur, Duration; 23 import std.array : empty; 24 import std.conv : to; 25 import std.exception : enforce; 26 import std.socket; 27 import thrift.base; 28 import thrift.async.base; 29 import thrift.transport.base; 30 import thrift.transport.socket : TSocketBase; 31 import thrift.internal.endian; 32 import thrift.internal.socket; 33 34 version (Windows) { 35 import std.c.windows.winsock : connect; 36 } else version (Posix) { 37 import core.sys.posix.sys.socket : connect; 38 } else static assert(0, "Don't know connect on this platform."); 39 40 /** 41 * Non-blocking socket implementation of the TTransport interface. 42 * 43 * Whenever a socket operation would block, TAsyncSocket registers a callback 44 * with the specified TAsyncSocketManager and yields. 45 * 46 * As for thrift.transport.socket, due to the limitations of std.socket, 47 * currently only TCP/IP sockets are supported (i.e. Unix domain sockets are 48 * not). 49 */ 50 class TAsyncSocket : TSocketBase, TAsyncTransport { 51 /** 52 * Constructor that takes an already created, connected (!) socket. 53 * 54 * Params: 55 * asyncManager = The TAsyncSocketManager to use for non-blocking I/O. 56 * socket = Already created, connected socket object. Will be switched to 57 * non-blocking mode if it isn't already. 58 */ 59 this(TAsyncSocketManager asyncManager, Socket socket) { 60 asyncManager_ = asyncManager; 61 socket.blocking = false; 62 super(socket); 63 } 64 65 /** 66 * Creates a new unconnected socket that will connect to the given host 67 * on the given port. 68 * 69 * Params: 70 * asyncManager = The TAsyncSocketManager to use for non-blocking I/O. 71 * host = Remote host. 72 * port = Remote port. 73 */ 74 this(TAsyncSocketManager asyncManager, string host, ushort port) { 75 asyncManager_ = asyncManager; 76 super(host, port); 77 } 78 79 override TAsyncManager asyncManager() @property { 80 return asyncManager_; 81 } 82 83 /** 84 * Asynchronously connects the socket. 85 * 86 * Completes without blocking and defers further operations on the socket 87 * until the connection is established. If connecting fails, this is 88 * currently not indicated in any way other than every call to read/write 89 * failing. 90 */ 91 override void open() { 92 if (isOpen) return; 93 94 enforce(!host_.empty, new TTransportException( 95 "Cannot open null host.", TTransportException.Type.NOT_OPEN)); 96 enforce(port_ != 0, new TTransportException( 97 "Cannot open with null port.", TTransportException.Type.NOT_OPEN)); 98 99 100 // Cannot use std.socket.Socket.connect here because it hides away 101 // EINPROGRESS/WSAWOULDBLOCK. 102 Address addr; 103 try { 104 // Currently, we just go with the first address returned, could be made 105 // more intelligent though – IPv6? 106 addr = getAddress(host_, port_)[0]; 107 } catch (Exception e) { 108 throw new TTransportException(`Unable to resolve host "` ~ host_ ~ `".`, 109 TTransportException.Type.NOT_OPEN, __FILE__, __LINE__, e); 110 } 111 112 socket_ = new TcpSocket(addr.addressFamily); 113 socket_.blocking = false; 114 setSocketOpts(); 115 116 auto errorCode = connect(socket_.handle, addr.name(), addr.nameLen()); 117 if (errorCode == 0) { 118 // If the connection could be established immediately, just return. I 119 // don't know if this ever happens. 120 return; 121 } 122 123 auto errno = getSocketErrno(); 124 if (errno != CONNECT_INPROGRESS_ERRNO) { 125 throw new TTransportException(`Could not establish connection to "` ~ 126 host_ ~ `": ` ~ socketErrnoString(errno), 127 TTransportException.Type.NOT_OPEN); 128 } 129 130 // This is the expected case: connect() signalled that the connection 131 // is being established in the background. Queue up a work item with the 132 // async manager which just defers any other operations on this 133 // TAsyncSocket instance until the socket is ready. 134 asyncManager_.execute(this, 135 { 136 auto fiber = Fiber.getThis(); 137 TAsyncEventReason reason = void; 138 asyncManager_.addOneshotListener(socket_, TAsyncEventType.WRITE, 139 connectTimeout, 140 scopedDelegate((TAsyncEventReason r){ reason = r; fiber.call(); }) 141 ); 142 Fiber.yield(); 143 144 if (reason == TAsyncEventReason.TIMED_OUT) { 145 // Close the connection, so that subsequent work items fail immediately. 146 closeImmediately(); 147 return; 148 } 149 150 int errorCode = void; 151 socket_.getOption(SocketOptionLevel.SOCKET, cast(SocketOption)SO_ERROR, 152 errorCode); 153 154 if (errorCode) { 155 logInfo("Could not connect TAsyncSocket: %s", 156 socketErrnoString(errorCode)); 157 158 // Close the connection, so that subsequent work items fail immediately. 159 closeImmediately(); 160 return; 161 } 162 163 } 164 ); 165 } 166 167 /** 168 * Closes the socket. 169 * 170 * Will block until all currently active operations are finished before the 171 * socket is closed. 172 */ 173 override void close() { 174 if (!isOpen) return; 175 176 import core.sync.condition; 177 import core.sync.mutex; 178 179 auto doneMutex = new Mutex; 180 auto doneCond = new Condition(doneMutex); 181 synchronized (doneMutex) { 182 asyncManager_.execute(this, 183 scopedDelegate( 184 { 185 closeImmediately(); 186 synchronized (doneMutex) doneCond.notifyAll(); 187 } 188 ) 189 ); 190 doneCond.wait(); 191 } 192 } 193 194 override bool peek() { 195 if (!isOpen) return false; 196 197 ubyte buf; 198 auto r = socket_.receive((&buf)[0..1], SocketFlags.PEEK); 199 if (r == Socket.ERROR) { 200 auto lastErrno = getSocketErrno(); 201 static if (connresetOnPeerShutdown) { 202 if (lastErrno == ECONNRESET) { 203 closeImmediately(); 204 return false; 205 } 206 } 207 throw new TTransportException("Peeking into socket failed: " ~ 208 socketErrnoString(lastErrno), TTransportException.Type.UNKNOWN); 209 } 210 return (r > 0); 211 } 212 213 override size_t read(ubyte[] buf) { 214 enforce(isOpen, new TTransportException( 215 "Cannot read if socket is not open.", TTransportException.Type.NOT_OPEN)); 216 217 typeof(getSocketErrno()) lastErrno; 218 219 auto r = yieldOnBlock(socket_.receive(cast(void[])buf), 220 TAsyncEventType.READ); 221 222 // If recv went fine, immediately return. 223 if (r >= 0) return r; 224 225 // Something went wrong, find out how to handle it. 226 lastErrno = getSocketErrno(); 227 228 static if (connresetOnPeerShutdown) { 229 // See top comment. 230 if (lastErrno == ECONNRESET) { 231 return 0; 232 } 233 } 234 235 throw new TTransportException("Receiving from socket failed: " ~ 236 socketErrnoString(lastErrno), TTransportException.Type.UNKNOWN); 237 } 238 239 override void write(in ubyte[] buf) { 240 size_t sent; 241 while (sent < buf.length) { 242 sent += writeSome(buf[sent .. $]); 243 } 244 assert(sent == buf.length); 245 } 246 247 override size_t writeSome(in ubyte[] buf) { 248 enforce(isOpen, new TTransportException( 249 "Cannot write if socket is not open.", TTransportException.Type.NOT_OPEN)); 250 251 auto r = yieldOnBlock(socket_.send(buf), TAsyncEventType.WRITE); 252 253 // Everything went well, just return the number of bytes written. 254 if (r > 0) return r; 255 256 // Handle error conditions. 257 if (r < 0) { 258 auto lastErrno = getSocketErrno(); 259 260 auto type = TTransportException.Type.UNKNOWN; 261 if (isSocketCloseErrno(lastErrno)) { 262 type = TTransportException.Type.NOT_OPEN; 263 closeImmediately(); 264 } 265 266 throw new TTransportException("Sending to socket failed: " ~ 267 socketErrnoString(lastErrno), type); 268 } 269 270 // send() should never return 0. 271 throw new TTransportException("Sending to socket failed (0 bytes written).", 272 TTransportException.Type.UNKNOWN); 273 } 274 275 /// The amount of time in which a conncetion must be established before the 276 /// open() call times out. 277 Duration connectTimeout = dur!"seconds"(5); 278 279 private: 280 void closeImmediately() { 281 socket_.close(); 282 socket_ = null; 283 } 284 285 T yieldOnBlock(T)(lazy T call, TAsyncEventType eventType) { 286 while (true) { 287 auto result = call(); 288 if (result != Socket.ERROR || getSocketErrno() != WOULD_BLOCK_ERRNO) return result; 289 290 // We got an EAGAIN result, register a callback to return here once some 291 // event happens and yield. 292 293 Duration timeout = void; 294 final switch (eventType) { 295 case TAsyncEventType.READ: 296 timeout = recvTimeout_; 297 break; 298 case TAsyncEventType.WRITE: 299 timeout = sendTimeout_; 300 break; 301 } 302 303 auto fiber = Fiber.getThis(); 304 assert(fiber, "Current fiber null – not running in TAsyncManager?"); 305 TAsyncEventReason eventReason = void; 306 asyncManager_.addOneshotListener(socket_, eventType, timeout, 307 scopedDelegate((TAsyncEventReason reason) { 308 eventReason = reason; 309 fiber.call(); 310 }) 311 ); 312 313 // Yields execution back to the async manager, will return back here once 314 // the above listener is called. 315 Fiber.yield(); 316 317 if (eventReason == TAsyncEventReason.TIMED_OUT) { 318 // If we are cancelling the request due to a timed out operation, the 319 // connection is in an undefined state, because the server could decide 320 // to send the requested data later, or we could have already been half- 321 // way into writing a request. Thus, we close the connection to make any 322 // possibly queued up work items fail immediately. Besides, the server 323 // is not very likely to immediately recover after a socket-level 324 // timeout has expired anyway. 325 closeImmediately(); 326 327 throw new TTransportException("Timed out while waiting for socket " ~ 328 "to get ready to " ~ to!string(eventType) ~ ".", 329 TTransportException.Type.TIMED_OUT); 330 } 331 } 332 } 333 334 /// The TAsyncSocketManager to use for non-blocking I/O. 335 TAsyncSocketManager asyncManager_; 336 } 337 338 private { 339 // std.socket doesn't include SO_ERROR for reasons unknown. 340 version (linux) { 341 enum SO_ERROR = 4; 342 } else version (OSX) { 343 enum SO_ERROR = 0x1007; 344 } else version (FreeBSD) { 345 enum SO_ERROR = 0x1007; 346 } else version (Win32) { 347 import std.c.windows.winsock : SO_ERROR; 348 } else static assert(false, "Don't know SO_ERROR on this platform."); 349 350 // This hack forces a delegate literal to be scoped, even if it is passed to 351 // a function accepting normal delegates as well. DMD likes to allocate the 352 // context on the heap anyway, but it seems to work for LDC. 353 import std.traits : isDelegate; 354 auto scopedDelegate(D)(scope D d) if (isDelegate!D) { 355 return d; 356 } 357 }