1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19 module thrift.internal.resource_pool; 20 21 import core.time : Duration, dur, TickDuration; 22 import std.algorithm : minPos, reduce, remove; 23 import std.array : array, empty; 24 import std.exception : enforce; 25 import std.conv : to; 26 import std.random : randomCover, rndGen; 27 import std.range : zip; 28 import thrift.internal.algorithm : removeEqual; 29 30 /** 31 * A pool of resources, which can be iterated over, and where resources that 32 * have failed too often can be temporarily disabled. 33 * 34 * This class is oblivious to the actual resource type managed. 35 */ 36 final class TResourcePool(Resource) { 37 /** 38 * Constructs a new instance. 39 * 40 * Params: 41 * resources = The initial members of the pool. 42 */ 43 this(Resource[] resources) { 44 resources_ = resources; 45 } 46 47 /** 48 * Adds a resource to the pool. 49 */ 50 void add(Resource resource) { 51 resources_ ~= resource; 52 } 53 54 /** 55 * Removes a resource from the pool. 56 * 57 * Returns: Whether the resource could be found in the pool. 58 */ 59 bool remove(Resource resource) { 60 auto oldLength = resources_.length; 61 resources_ = removeEqual(resources_, resource); 62 return resources_.length < oldLength; 63 } 64 65 /** 66 * Returns an »enriched« input range to iterate over the pool members. 67 */ 68 struct Range { 69 /** 70 * Whether the range is empty. 71 * 72 * This is the case if all members of the pool have been popped (or skipped 73 * because they were disabled) and TResourcePool.cycle is false, or there 74 * is no element to return in cycle mode because all have been temporarily 75 * disabled. 76 */ 77 bool empty() @property { 78 // If no resources are in the pool, the range will never become non-empty. 79 if (resources_.empty) return true; 80 81 // If we already got the next resource in the cache, it doesn't matter 82 // whether there are more. 83 if (cached_) return false; 84 85 size_t examineCount; 86 if (parent_.cycle) { 87 // We want to check all the resources, but not iterate more than once 88 // to avoid spinning in a loop if nothing is available. 89 examineCount = resources_.length; 90 } else { 91 // When not in cycle mode, we just iterate the list exactly once. If all 92 // items have been consumed, the interval below is empty. 93 examineCount = resources_.length - nextIndex_; 94 } 95 96 foreach (i; 0 .. examineCount) { 97 auto r = resources_[(nextIndex_ + i) % resources_.length]; 98 auto fi = r in parent_.faultInfos_; 99 100 if (fi && fi.resetTime != fi.resetTime.init) { 101 // The argument to < needs to be an lvalue… 102 auto currentTick = TickDuration.currSystemTick; 103 if (fi.resetTime < currentTick) { 104 // The timeout expired, remove the resource from the list and go 105 // ahead trying it. 106 parent_.faultInfos_.remove(r); 107 } else { 108 // The timeout didn't expire yet, try the next resource. 109 continue; 110 } 111 } 112 113 cache_ = r; 114 cached_ = true; 115 nextIndex_ = nextIndex_ + i + 1; 116 return false; 117 } 118 119 // If we get here, all resources are currently inactive or the non-cycle 120 // pool has been exhausted, so there is nothing we can do. 121 nextIndex_ = nextIndex_ + examineCount; 122 return true; 123 } 124 125 /** 126 * Returns the first resource in the range. 127 */ 128 Resource front() @property { 129 enforce(!empty); 130 return cache_; 131 } 132 133 /** 134 * Removes the first resource from the range. 135 * 136 * Usually, this is combined with a call to TResourcePool.recordSuccess() 137 * or recordFault(). 138 */ 139 void popFront() { 140 enforce(!empty); 141 cached_ = false; 142 } 143 144 /** 145 * Returns whether the range will become non-empty at some point in the 146 * future, and provides additional information when this will happen and 147 * what will be the next resource. 148 * 149 * Makes only sense to call on empty ranges. 150 * 151 * Params: 152 * next = The next resource that will become available. 153 * waitTime = The duration until that resource will become available. 154 */ 155 bool willBecomeNonempty(out Resource next, out Duration waitTime) { 156 // If no resources are in the pool, the range will never become non-empty. 157 if (resources_.empty) return true; 158 159 // If cycle mode is not enabled, a range never becomes non-empty after 160 // being empty once, because all the elements have already been 161 // used/skipped in order to become empty. 162 if (!parent_.cycle) return false; 163 164 auto fi = parent_.faultInfos_; 165 auto nextPair = minPos!"a[1].resetTime < b[1].resetTime"( 166 zip(fi.keys, fi.values) 167 ).front; 168 169 next = nextPair[0]; 170 waitTime = to!Duration(nextPair[1].resetTime - TickDuration.currSystemTick); 171 172 return true; 173 } 174 175 private: 176 this(TResourcePool parent, Resource[] resources) { 177 parent_ = parent; 178 resources_ = resources; 179 } 180 181 TResourcePool parent_; 182 183 /// All available resources. We keep a copy of it as to not get confused 184 /// when resources are added to/removed from the parent pool. 185 Resource[] resources_; 186 187 /// After we have determined the next element in empty(), we store it here. 188 Resource cache_; 189 190 /// Whether there is currently something in the cache. 191 bool cached_; 192 193 /// The index to start searching from at the next call to empty(). 194 size_t nextIndex_; 195 } 196 197 /// Ditto 198 Range opSlice() { 199 auto res = resources_; 200 if (permute) { 201 res = array(randomCover(res, rndGen)); 202 } 203 return Range(this, res); 204 } 205 206 /** 207 * Records a success for an operation on the given resource, cancelling a 208 * fault streak, if any. 209 */ 210 void recordSuccess(Resource resource) { 211 if (resource in faultInfos_) { 212 faultInfos_.remove(resource); 213 } 214 } 215 216 /** 217 * Records a fault for the given resource. 218 * 219 * If a resource fails consecutively for more than faultDisableCount times, 220 * it is temporarily disabled (no longer considered) until 221 * faultDisableDuration has passed. 222 */ 223 void recordFault(Resource resource) { 224 auto fi = resource in faultInfos_; 225 226 if (!fi) { 227 faultInfos_[resource] = FaultInfo(); 228 fi = resource in faultInfos_; 229 } 230 231 ++fi.count; 232 if (fi.count >= faultDisableCount) { 233 // If the resource has hit the fault count limit, disable it for 234 // specified duration. 235 fi.resetTime = TickDuration.currSystemTick + 236 TickDuration.from!"hnsecs"(faultDisableDuration.total!"hnsecs"); 237 } 238 } 239 240 /** 241 * Whether to randomly permute the order of the resources in the pool when 242 * taking a range using opSlice(). 243 * 244 * This can be used e.g. as a simple form of load balancing. 245 */ 246 bool permute = true; 247 248 /** 249 * Whether to keep iterating over the pool members after all have been 250 * returned/have failed once. 251 */ 252 bool cycle = false; 253 254 /** 255 * The number of consecutive faults after which a resource is disabled until 256 * faultDisableDuration has passed. Zero to never disable resources. 257 * 258 * Defaults to zero. 259 */ 260 ushort faultDisableCount = 0; 261 262 /** 263 * The duration for which a resource is no longer considered after it has 264 * failed too often. 265 * 266 * Defaults to one second. 267 */ 268 Duration faultDisableDuration = dur!"seconds"(1); 269 270 private: 271 Resource[] resources_; 272 FaultInfo[Resource] faultInfos_; 273 } 274 275 private { 276 struct FaultInfo { 277 ushort count; 278 TickDuration resetTime; 279 } 280 } 281 282 import std.datetime; 283 import thrift.base; 284 285 unittest { 286 import core.thread; 287 288 auto a = new Object; 289 auto b = new Object; 290 auto c = new Object; 291 auto objs = [a, b, c]; 292 auto pool = new TResourcePool!Object(objs); 293 pool.permute = false; 294 pool.faultDisableDuration = dur!"msecs"(5); 295 Object dummyRes = void; 296 Duration dummyDur = void; 297 298 { 299 auto r = pool[]; 300 301 foreach (i, o; objs) { 302 enforce(!r.empty); 303 enforce(r.front == o); 304 r.popFront(); 305 } 306 307 enforce(r.empty); 308 enforce(!r.willBecomeNonempty(dummyRes, dummyDur)); 309 } 310 311 { 312 pool.faultDisableCount = 2; 313 314 enforce(pool[].front == a); 315 pool.recordFault(a); 316 enforce(pool[].front == a); 317 pool.recordSuccess(a); 318 enforce(pool[].front == a); 319 pool.recordFault(a); 320 enforce(pool[].front == a); 321 pool.recordFault(a); 322 323 auto r = pool[]; 324 enforce(r.front == b); 325 r.popFront(); 326 enforce(r.front == c); 327 r.popFront(); 328 enforce(r.empty); 329 enforce(!r.willBecomeNonempty(dummyRes, dummyDur)); 330 331 Thread.sleep(dur!"msecs"(5)); 332 // Not in cycle mode, has to be still empty after the timeouts expired. 333 enforce(r.empty); 334 enforce(!r.willBecomeNonempty(dummyRes, dummyDur)); 335 336 foreach (o; objs) pool.recordSuccess(o); 337 } 338 339 { 340 pool.faultDisableCount = 1; 341 342 pool.recordFault(a); 343 Thread.sleep(dur!"usecs"(1)); 344 pool.recordFault(b); 345 Thread.sleep(dur!"usecs"(1)); 346 pool.recordFault(c); 347 348 auto r = pool[]; 349 enforce(r.empty); 350 enforce(!r.willBecomeNonempty(dummyRes, dummyDur)); 351 352 foreach (o; objs) pool.recordSuccess(o); 353 } 354 355 pool.cycle = true; 356 357 { 358 auto r = pool[]; 359 360 foreach (o; objs ~ objs) { 361 enforce(!r.empty); 362 enforce(r.front == o); 363 r.popFront(); 364 } 365 } 366 367 { 368 pool.faultDisableCount = 2; 369 370 enforce(pool[].front == a); 371 pool.recordFault(a); 372 enforce(pool[].front == a); 373 pool.recordSuccess(a); 374 enforce(pool[].front == a); 375 pool.recordFault(a); 376 enforce(pool[].front == a); 377 pool.recordFault(a); 378 379 auto r = pool[]; 380 enforce(r.front == b); 381 r.popFront(); 382 enforce(r.front == c); 383 r.popFront(); 384 enforce(r.front == b); 385 386 Thread.sleep(dur!"msecs"(5)); 387 388 r.popFront(); 389 enforce(r.front == c); 390 391 r.popFront(); 392 enforce(r.front == a); 393 394 enforce(pool[].front == a); 395 396 foreach (o; objs) pool.recordSuccess(o); 397 } 398 399 { 400 pool.faultDisableCount = 1; 401 402 pool.recordFault(a); 403 Thread.sleep(dur!"usecs"(1)); 404 pool.recordFault(b); 405 Thread.sleep(dur!"usecs"(1)); 406 pool.recordFault(c); 407 408 auto r = pool[]; 409 enforce(r.empty); 410 411 Object nextRes; 412 Duration nextWait; 413 enforce(r.willBecomeNonempty(nextRes, nextWait)); 414 enforce(nextRes == a); 415 enforce(nextWait > dur!"hnsecs"(0)); 416 417 foreach (o; objs) pool.recordSuccess(o); 418 } 419 }