1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 module thrift.internal.resource_pool;
20 
21 import core.time : Duration, dur, TickDuration;
22 import std.algorithm : minPos, reduce, remove;
23 import std.array : array, empty;
24 import std.exception : enforce;
25 import std.conv : to;
26 import std.random : randomCover, rndGen;
27 import std.range : zip;
28 import thrift.internal.algorithm : removeEqual;
29 
30 /**
31  * A pool of resources, which can be iterated over, and where resources that
32  * have failed too often can be temporarily disabled.
33  *
34  * This class is oblivious to the actual resource type managed.
35  */
36 final class TResourcePool(Resource) {
37   /**
38    * Constructs a new instance.
39    *
40    * Params:
41    *   resources = The initial members of the pool.
42    */
43   this(Resource[] resources) {
44     resources_ = resources;
45   }
46 
47   /**
48    * Adds a resource to the pool.
49    */
50   void add(Resource resource) {
51     resources_ ~= resource;
52   }
53 
54   /**
55    * Removes a resource from the pool.
56    *
57    * Returns: Whether the resource could be found in the pool.
58    */
59   bool remove(Resource resource) {
60     auto oldLength = resources_.length;
61     resources_ = removeEqual(resources_, resource);
62     return resources_.length < oldLength;
63   }
64 
65   /**
66    * Returns an »enriched« input range to iterate over the pool members.
67    */
68   struct Range {
69     /**
70      * Whether the range is empty.
71      *
72      * This is the case if all members of the pool have been popped (or skipped
73      * because they were disabled) and TResourcePool.cycle is false, or there
74      * is no element to return in cycle mode because all have been temporarily
75      * disabled.
76      */
77     bool empty() @property {
78       // If no resources are in the pool, the range will never become non-empty.
79       if (resources_.empty) return true;
80 
81       // If we already got the next resource in the cache, it doesn't matter
82       // whether there are more.
83       if (cached_) return false;
84 
85       size_t examineCount;
86       if (parent_.cycle) {
87         // We want to check all the resources, but not iterate more than once
88         // to avoid spinning in a loop if nothing is available.
89         examineCount = resources_.length;
90       } else {
91         // When not in cycle mode, we just iterate the list exactly once. If all
92         // items have been consumed, the interval below is empty.
93         examineCount = resources_.length - nextIndex_;
94       }
95 
96       foreach (i; 0 .. examineCount) {
97         auto r = resources_[(nextIndex_ + i) % resources_.length];
98         auto fi = r in parent_.faultInfos_;
99 
100         if (fi && fi.resetTime != fi.resetTime.init) {
101           // The argument to < needs to be an lvalue…
102           auto currentTick = TickDuration.currSystemTick;
103           if (fi.resetTime < currentTick) {
104             // The timeout expired, remove the resource from the list and go
105             // ahead trying it.
106             parent_.faultInfos_.remove(r);
107           } else {
108             // The timeout didn't expire yet, try the next resource.
109             continue;
110           }
111         }
112 
113         cache_ = r;
114         cached_ = true;
115         nextIndex_ = nextIndex_ + i + 1;
116         return false;
117       }
118 
119       // If we get here, all resources are currently inactive or the non-cycle
120       // pool has been exhausted, so there is nothing we can do.
121       nextIndex_ = nextIndex_ + examineCount;
122       return true;
123     }
124 
125     /**
126      * Returns the first resource in the range.
127      */
128     Resource front() @property {
129       enforce(!empty);
130       return cache_;
131     }
132 
133     /**
134      * Removes the first resource from the range.
135      *
136      * Usually, this is combined with a call to TResourcePool.recordSuccess()
137      * or recordFault().
138      */
139     void popFront() {
140       enforce(!empty);
141       cached_ = false;
142     }
143 
144     /**
145      * Returns whether the range will become non-empty at some point in the
146      * future, and provides additional information when this will happen and
147      * what will be the next resource.
148      *
149      * Makes only sense to call on empty ranges.
150      *
151      * Params:
152      *   next = The next resource that will become available.
153      *   waitTime = The duration until that resource will become available.
154      */
155     bool willBecomeNonempty(out Resource next, out Duration waitTime) {
156       // If no resources are in the pool, the range will never become non-empty.
157       if (resources_.empty) return true;
158 
159       // If cycle mode is not enabled, a range never becomes non-empty after
160       // being empty once, because all the elements have already been
161       // used/skipped in order to become empty.
162       if (!parent_.cycle) return false;
163 
164       auto fi = parent_.faultInfos_;
165       auto nextPair = minPos!"a[1].resetTime < b[1].resetTime"(
166         zip(fi.keys, fi.values)
167       ).front;
168 
169       next = nextPair[0];
170       waitTime = to!Duration(nextPair[1].resetTime - TickDuration.currSystemTick);
171 
172       return true;
173     }
174 
175   private:
176     this(TResourcePool parent, Resource[] resources) {
177       parent_ = parent;
178       resources_ = resources;
179     }
180 
181     TResourcePool parent_;
182 
183     /// All available resources. We keep a copy of it as to not get confused
184     /// when resources are added to/removed from the parent pool.
185     Resource[] resources_;
186 
187     /// After we have determined the next element in empty(), we store it here.
188     Resource cache_;
189 
190     /// Whether there is currently something in the cache.
191     bool cached_;
192 
193     /// The index to start searching from at the next call to empty().
194     size_t nextIndex_;
195   }
196 
197   /// Ditto
198   Range opSlice() {
199     auto res = resources_;
200     if (permute) {
201       res = array(randomCover(res, rndGen));
202     }
203     return Range(this, res);
204   }
205 
206   /**
207    * Records a success for an operation on the given resource, cancelling a
208    * fault streak, if any.
209    */
210   void recordSuccess(Resource resource) {
211     if (resource in faultInfos_) {
212       faultInfos_.remove(resource);
213     }
214   }
215 
216   /**
217    * Records a fault for the given resource.
218    *
219    * If a resource fails consecutively for more than faultDisableCount times,
220    * it is temporarily disabled (no longer considered) until
221    * faultDisableDuration has passed.
222    */
223   void recordFault(Resource resource) {
224     auto fi = resource in faultInfos_;
225 
226     if (!fi) {
227       faultInfos_[resource] = FaultInfo();
228       fi = resource in faultInfos_;
229     }
230 
231     ++fi.count;
232     if (fi.count >= faultDisableCount) {
233       // If the resource has hit the fault count limit, disable it for
234       // specified duration.
235       fi.resetTime = TickDuration.currSystemTick +
236         TickDuration.from!"hnsecs"(faultDisableDuration.total!"hnsecs");
237     }
238   }
239 
240   /**
241    * Whether to randomly permute the order of the resources in the pool when
242    * taking a range using opSlice().
243    *
244    * This can be used e.g. as a simple form of load balancing.
245    */
246   bool permute = true;
247 
248   /**
249    * Whether to keep iterating over the pool members after all have been
250    * returned/have failed once.
251    */
252   bool cycle = false;
253 
254   /**
255    * The number of consecutive faults after which a resource is disabled until
256    * faultDisableDuration has passed. Zero to never disable resources.
257    *
258    * Defaults to zero.
259    */
260   ushort faultDisableCount = 0;
261 
262   /**
263    * The duration for which a resource is no longer considered after it has
264    * failed too often.
265    *
266    * Defaults to one second.
267    */
268   Duration faultDisableDuration = dur!"seconds"(1);
269 
270 private:
271   Resource[] resources_;
272   FaultInfo[Resource] faultInfos_;
273 }
274 
275 private {
276   struct FaultInfo {
277     ushort count;
278     TickDuration resetTime;
279   }
280 }
281 
282 import std.datetime;
283 import thrift.base;
284 
285 unittest {
286   import core.thread;
287 
288   auto a = new Object;
289   auto b = new Object;
290   auto c = new Object;
291   auto objs = [a, b, c];
292   auto pool = new TResourcePool!Object(objs);
293   pool.permute = false;
294   pool.faultDisableDuration = dur!"msecs"(5);
295   Object dummyRes = void;
296   Duration dummyDur = void;
297 
298   {
299     auto r = pool[];
300 
301     foreach (i, o; objs) {
302       enforce(!r.empty);
303       enforce(r.front == o);
304       r.popFront();
305     }
306 
307     enforce(r.empty);
308     enforce(!r.willBecomeNonempty(dummyRes, dummyDur));
309   }
310 
311   {
312     pool.faultDisableCount = 2;
313 
314     enforce(pool[].front == a);
315     pool.recordFault(a);
316     enforce(pool[].front == a);
317     pool.recordSuccess(a);
318     enforce(pool[].front == a);
319     pool.recordFault(a);
320     enforce(pool[].front == a);
321     pool.recordFault(a);
322 
323     auto r = pool[];
324     enforce(r.front == b);
325     r.popFront();
326     enforce(r.front == c);
327     r.popFront();
328     enforce(r.empty);
329     enforce(!r.willBecomeNonempty(dummyRes, dummyDur));
330 
331     Thread.sleep(dur!"msecs"(5));
332     // Not in cycle mode, has to be still empty after the timeouts expired.
333     enforce(r.empty);
334     enforce(!r.willBecomeNonempty(dummyRes, dummyDur));
335 
336     foreach (o; objs) pool.recordSuccess(o);
337   }
338 
339   {
340     pool.faultDisableCount = 1;
341 
342     pool.recordFault(a);
343     Thread.sleep(dur!"usecs"(1));
344     pool.recordFault(b);
345     Thread.sleep(dur!"usecs"(1));
346     pool.recordFault(c);
347 
348     auto r = pool[];
349     enforce(r.empty);
350     enforce(!r.willBecomeNonempty(dummyRes, dummyDur));
351 
352     foreach (o; objs) pool.recordSuccess(o);
353   }
354 
355   pool.cycle = true;
356 
357   {
358     auto r = pool[];
359 
360     foreach (o; objs ~ objs) {
361       enforce(!r.empty);
362       enforce(r.front == o);
363       r.popFront();
364     }
365   }
366 
367   {
368     pool.faultDisableCount = 2;
369 
370     enforce(pool[].front == a);
371     pool.recordFault(a);
372     enforce(pool[].front == a);
373     pool.recordSuccess(a);
374     enforce(pool[].front == a);
375     pool.recordFault(a);
376     enforce(pool[].front == a);
377     pool.recordFault(a);
378 
379     auto r = pool[];
380     enforce(r.front == b);
381     r.popFront();
382     enforce(r.front == c);
383     r.popFront();
384     enforce(r.front == b);
385 
386     Thread.sleep(dur!"msecs"(5));
387 
388     r.popFront();
389     enforce(r.front == c);
390 
391     r.popFront();
392     enforce(r.front == a);
393 
394     enforce(pool[].front == a);
395 
396     foreach (o; objs) pool.recordSuccess(o);
397   }
398 
399   {
400     pool.faultDisableCount = 1;
401 
402     pool.recordFault(a);
403     Thread.sleep(dur!"usecs"(1));
404     pool.recordFault(b);
405     Thread.sleep(dur!"usecs"(1));
406     pool.recordFault(c);
407 
408     auto r = pool[];
409     enforce(r.empty);
410 
411     Object nextRes;
412     Duration nextWait;
413     enforce(r.willBecomeNonempty(nextRes, nextWait));
414     enforce(nextRes == a);
415     enforce(nextWait > dur!"hnsecs"(0));
416 
417     foreach (o; objs) pool.recordSuccess(o);
418   }
419 }