1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */19 modulethrift.async.libevent;
20 21 importcore.atomic;
22 importcore.time : Duration, dur;
23 importcore.exception : onOutOfMemoryError;
24 importcore.memory : GC;
25 importcore.thread : Fiber, Thread;
26 importcore.sync.condition;
27 importcore.sync.mutex;
28 importcore.stdc.stdlib : free, malloc;
29 importdeimos.event2.event;
30 importstd.array : empty, front, popFront;
31 importstd.conv : text, to;
32 importstd.exception : enforce;
33 importstd.socket : Socket, socketPair;
34 importthrift.base;
35 importthrift.async.base;
36 importthrift.internal.socket;
37 importthrift.internal.traits;
38 importthrift.util.cancellation;
39 40 // To avoid DMD @@BUG6395@@.41 importthrift.internal.algorithm;
42 43 /**
44 * A TAsyncManager implementation based on libevent.
45 *
46 * The libevent loop for handling non-blocking sockets is run in a background
47 * thread, which is lazily spawned. The thread is not daemonized to avoid
48 * crashes on program shutdown, it is only stopped when the manager instance
49 * is destroyed. So, to ensure a clean program teardown, either make sure this
50 * instance gets destroyed (e.g. by using scope), or manually call stop() at
51 * the end.
52 */53 classTLibeventAsyncManager : TAsyncSocketManager {
54 this() {
55 eventBase_ = event_base_new();
56 57 // Set up the socket pair for transferring control messages to the event58 // loop.59 autopair = socketPair();
60 controlSendSocket_ = pair[0];
61 controlReceiveSocket_ = pair[1];
62 controlReceiveSocket_.blocking = false;
63 64 // Register an event for receiving control messages.65 controlReceiveEvent_ = event_new(eventBase_, controlReceiveSocket_.handle,
66 EV_READ | EV_PERSIST | EV_ET, assumeNothrow(&controlMsgReceiveCallback),
67 cast(void*)this);
68 event_add(controlReceiveEvent_, null);
69 70 queuedCountMutex_ = newMutex;
71 zeroQueuedCondition_ = newCondition(queuedCountMutex_);
72 }
73 74 ~this() {
75 // stop() should be safe to call, because either we don't have a worker76 // thread running and it is a no-op anyway, or it is guaranteed to be77 // still running (blocked in event_base_loop), and thus guaranteed not to78 // be garbage collected yet.79 stop(dur!"hnsecs"(0));
80 81 event_free(controlReceiveEvent_);
82 event_base_free(eventBase_);
83 eventBase_ = null;
84 }
85 86 overridevoidexecute(TAsyncTransporttransport, Workwork,
87 TCancellationcancellation = null88 ) {
89 if (cancellation && cancellation.triggered) return;
90 91 // Keep track that there is a new work item to be processed.92 incrementQueuedCount();
93 94 ensureWorkerThreadRunning();
95 96 // We should be able to send the control message as a whole – we currently97 // assume to be able to receive it at once as well. If this proves to be98 // unstable (e.g. send could possibly return early if the receiving buffer99 // is full and the blocking call gets interrupted by a signal), it could100 // be changed to a more sophisticated scheme.101 102 // Make sure the delegate context doesn't get GCd while the work item is103 // on the wire.104 GC.addRoot(work.ptr);
105 106 // Send work message.107 sendControlMsg(ControlMsg(MsgType.WORK, work, transport));
108 109 if (cancellation) {
110 cancellation.triggering.addCallback({
111 sendControlMsg(ControlMsg(MsgType.CANCEL, work, transport));
112 });
113 }
114 }
115 116 overridevoiddelay(Durationduration, voiddelegate() work) {
117 incrementQueuedCount();
118 119 ensureWorkerThreadRunning();
120 121 consttv = toTimeval(duration);
122 123 // DMD @@BUG@@: Cannot deduce T to void delegate() here.124 registerOneshotEvent!(voiddelegate())(
125 -1, 0, assumeNothrow(&delayCallback), &tv,
126 {
127 work();
128 decrementQueuedCount();
129 }
130 );
131 }
132 133 overrideboolstop(DurationwaitFinishTimeout = dur!"hnsecs"(-1)) {
134 boolcleanExit = true;
135 136 synchronized (this) {
137 if (workerThread_) {
138 synchronized (queuedCountMutex_) {
139 if (waitFinishTimeout > dur!"hnsecs"(0)) {
140 if (queuedCount_ > 0) {
141 zeroQueuedCondition_.wait(waitFinishTimeout);
142 }
143 } elseif (waitFinishTimeout < dur!"hnsecs"(0)) {
144 while (queuedCount_ > 0) zeroQueuedCondition_.wait();
145 } else {
146 // waitFinishTimeout is zero, immediately exit in all cases.147 }
148 cleanExit = (queuedCount_ == 0);
149 }
150 151 event_base_loopbreak(eventBase_);
152 sendControlMsg(ControlMsg(MsgType.SHUTDOWN));
153 workerThread_.join();
154 workQueues_ = null;
155 // We have nuked all currently enqueued items, so set the count to156 // zero. This is safe to do without locking, since the worker thread157 // is down.158 queuedCount_ = 0;
159 atomicStore(*(cast(shared)&workerThread_), cast(shared(Thread))null);
160 }
161 }
162 163 returncleanExit;
164 }
165 166 overridevoidaddOneshotListener(Socketsocket, TAsyncEventTypeeventType,
167 TSocketEventListenerlistener168 ) {
169 addOneshotListenerImpl(socket, eventType, null, listener);
170 }
171 172 overridevoidaddOneshotListener(Socketsocket, TAsyncEventTypeeventType,
173 Durationtimeout, TSocketEventListenerlistener174 ) {
175 if (timeout <= dur!"hnsecs"(0)) {
176 addOneshotListenerImpl(socket, eventType, null, listener);
177 } else {
178 // This is not really documented well, but libevent does not require to179 // keep the timeval around after the event was added.180 autotv = toTimeval(timeout);
181 addOneshotListenerImpl(socket, eventType, &tv, listener);
182 }
183 }
184 185 private:
186 aliasvoiddelegate() Work;
187 188 voidaddOneshotListenerImpl(Socketsocket, TAsyncEventTypeeventType,
189 const(timeval)* timeout, TSocketEventListenerlistener190 ) {
191 registerOneshotEvent(socket.handle, libeventEventType(eventType),
192 assumeNothrow(&socketCallback), timeout, listener);
193 }
194 195 voidregisterOneshotEvent(T)(evutil_socket_tfd, shorttype,
196 event_callback_fncallback, const(timeval)* timeout, Tpayload197 ) {
198 // Create a copy of the payload on the C heap.199 autopayloadMem = malloc(payload.sizeof);
200 if (!payloadMem) onOutOfMemoryError();
201 (cast(T*)payloadMem)[0 .. 1] = payload;
202 GC.addRange(payloadMem, payload.sizeof);
203 204 autoresult = event_base_once(eventBase_, fd, type, callback,
205 payloadMem, timeout);
206 207 // Assuming that we didn't get our arguments wrong above, the only other208 // situation in which event_base_once can fail is when it can't allocate209 // memory.210 if (result != 0) onOutOfMemoryError();
211 }
212 213 enumMsgType : ubyte {
214 SHUTDOWN,
215 WORK,
216 CANCEL217 }
218 219 structControlMsg {
220 MsgTypetype;
221 Workwork;
222 TAsyncTransporttransport;
223 }
224 225 /**
226 * Starts the worker thread if it is not already running.
227 */228 voidensureWorkerThreadRunning() {
229 // Technically, only half barriers would be required here, but adding the230 // argument seems to trigger a DMD template argument deduction @@BUG@@.231 if (!atomicLoad(*(cast(shared)&workerThread_))) {
232 synchronized (this) {
233 if (!workerThread_) {
234 autothread = newThread({ event_base_loop(eventBase_, 0); });
235 thread.start();
236 atomicStore(*(cast(shared)&workerThread_), cast(shared)thread);
237 }
238 }
239 }
240 }
241 242 /**
243 * Sends a control message to the worker thread.
244 */245 voidsendControlMsg(const(ControlMsg) msg) {
246 autoresult = controlSendSocket_.send((&msg)[0 .. 1]);
247 enumsize = msg.sizeof;
248 enforce(result == size, newTException(text(
249 "Sending control message of type ", msg.type, " failed (", result,
250 " bytes instead of ", size, " transmitted).")));
251 }
252 253 /**
254 * Receives messages from the control message socket and acts on them. Called
255 * from the worker thread.
256 */257 voidreceiveControlMsg() {
258 // Read as many new work items off the socket as possible (at least one259 // should be available, as we got notified by libevent).260 ControlMsgmsg;
261 ptrdiff_tbytesRead;
262 while (true) {
263 bytesRead = controlReceiveSocket_.receive(cast(ubyte[])((&msg)[0 .. 1]));
264 265 if (bytesRead < 0) {
266 autoerrno = getSocketErrno();
267 if (errno != WOULD_BLOCK_ERRNO) {
268 logError("Reading control message, some work item will possibly " ~
269 "never be executed: %s", socketErrnoString(errno));
270 }
271 }
272 if (bytesRead != msg.sizeof) break;
273 274 // Everything went fine, we received a new control message.275 finalswitch (msg.type) {
276 caseMsgType.SHUTDOWN:
277 // The message was just intended to wake us up for shutdown.278 break;
279 280 caseMsgType.CANCEL:
281 // When processing a cancellation, we must not touch the first item,282 // since it is already being processed.283 autoqueue = workQueues_[msg.transport];
284 if (queue.length > 0) {
285 workQueues_[msg.transport] = [queue[0]] ~
286 removeEqual(queue[1 .. $], msg.work);
287 }
288 break;
289 290 caseMsgType.WORK:
291 // Now that the work item is back in the D world, we don't need the292 // extra GC root for the context pointer anymore (see execute()).293 GC.removeRoot(msg.work.ptr);
294 295 // Add the work item to the queue and execute it.296 autoqueue = msg.transportinworkQueues_;
297 if (queueisnull || (*queue).empty) {
298 // If the queue is empty, add the new work item to the queue as well,299 // but immediately start executing it.300 workQueues_[msg.transport] = [msg.work];
301 executeWork(msg.transport, msg.work);
302 } else {
303 (*queue) ~= msg.work;
304 }
305 break;
306 }
307 }
308 309 // If the last read was successful, but didn't read enough bytes, we got310 // a problem.311 if (bytesRead > 0) {
312 logError("Unexpected partial control message read (%s byte(s) " ~
313 "instead of %s), some work item will possibly never be executed.",
314 bytesRead, msg.sizeof);
315 }
316 }
317 318 /**
319 * Executes the given work item and all others enqueued for the same
320 * transport in a new fiber. Called from the worker thread.
321 */322 voidexecuteWork(TAsyncTransporttransport, Workwork) {
323 (newFiber({
324 autoitem = work;
325 while (true) {
326 try {
327 // Execute the actual work. It will possibly add listeners to the328 // event loop and yield away if it has to wait for blocking329 // operations. It is quite possible that another fiber will modify330 // the work queue for the current transport.331 item();
332 } catch (Exceptione) {
333 // This should never happen, just to be sure the worker thread334 // doesn't stop working in mysterious ways because of an unhandled335 // exception.336 logError("Exception thrown by work item: %s", e);
337 }
338 339 // Remove the item from the work queue.340 // Note: Due to the value semantics of array slices, we have to341 // re-lookup this on every iteration. This could be solved, but I'd342 // rather replace this directly with a queue type once one becomes343 // available in Phobos.344 autoqueue = workQueues_[transport];
345 assert(queue.front == item);
346 queue.popFront();
347 workQueues_[transport] = queue;
348 349 // Now that the work item is done, no longer count it as queued.350 decrementQueuedCount();
351 352 if (queue.empty) break;
353 354 // If the queue is not empty, execute the next waiting item.355 item = queue.front;
356 }
357 })).call();
358 }
359 360 /**
361 * Increments the amount of queued items.
362 */363 voidincrementQueuedCount() {
364 synchronized (queuedCountMutex_) {
365 ++queuedCount_;
366 }
367 }
368 369 /**
370 * Decrements the amount of queued items.
371 */372 voiddecrementQueuedCount() {
373 synchronized (queuedCountMutex_) {
374 assert(queuedCount_ > 0);
375 --queuedCount_;
376 if (queuedCount_ == 0) {
377 zeroQueuedCondition_.notifyAll();
378 }
379 }
380 }
381 382 staticextern(C) voidcontrolMsgReceiveCallback(evutil_socket_t, short,
383 void *managerThis384 ) {
385 (cast(TLibeventAsyncManager)managerThis).receiveControlMsg();
386 }
387 388 staticextern(C) voidsocketCallback(evutil_socket_t, shortflags,
389 void *arg390 ) {
391 autoreason = (flags & EV_TIMEOUT) ? TAsyncEventReason.TIMED_OUT :
392 TAsyncEventReason.NORMAL;
393 (*(cast(TSocketEventListener*)arg))(reason);
394 GC.removeRange(arg);
395 clear(arg);
396 free(arg);
397 }
398 399 staticextern(C) voiddelayCallback(evutil_socket_t, shortflags,
400 void *arg401 ) {
402 assert(flags & EV_TIMEOUT);
403 (*(cast(voiddelegate()*)arg))();
404 GC.removeRange(arg);
405 clear(arg);
406 free(arg);
407 }
408 409 ThreadworkerThread_;
410 411 event_base* eventBase_;
412 413 /// The socket used for receiving new work items in the event loop. Paired414 /// with controlSendSocket_. Invalid (i.e. TAsyncWorkItem.init) items are415 /// ignored and can be used to wake up the worker thread.416 SocketcontrolReceiveSocket_;
417 event* controlReceiveEvent_;
418 419 /// The socket used to send new work items to the event loop. It is420 /// expected that work items can always be read at once from it, i.e. that421 /// there will never be short reads.422 SocketcontrolSendSocket_;
423 424 /// Queued up work delegates for async transports. This also includes425 /// currently active ones, they are removed from the queue on completion,426 /// which is relied on by the control message receive fiber (the main one)427 /// to decide whether to immediately start executing items or not.428 // TODO: This should really be of some queue type, not an array slice, but429 // std.container doesn't have anything.430 Work[][TAsyncTransport] workQueues_;
431 432 /// The total number of work items not yet finished (queued and currently433 /// excecuted) and delays not yet executed.434 uintqueuedCount_;
435 436 /// Protects queuedCount_.437 MutexqueuedCountMutex_;
438 439 /// Triggered when queuedCount_ reaches zero, protected by queuedCountMutex_.440 ConditionzeroQueuedCondition_;
441 }
442 443 private {
444 timevaltoTimeval(const(Duration) dur) {
445 timevaltv = {tv_sec: cast(int)dur.total!"seconds"(),
446 tv_usec: dur.fracSec.usecs};
447 returntv;
448 }
449 450 /**
451 * Returns the libevent flags combination to represent a given TAsyncEventType.
452 */453 shortlibeventEventType(TAsyncEventTypetype) {
454 finalswitch (type) {
455 caseTAsyncEventType.READ:
456 returnEV_READ | EV_ET;
457 caseTAsyncEventType.WRITE:
458 returnEV_WRITE | EV_ET;
459 }
460 }
461 }