1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19 20 /** 21 * Transports for reading from/writing to Thrift »log files«. 22 * 23 * These transports are not »stupid« sources and sinks just reading and 24 * writing bytes from a file verbatim, but organize the contents in the form 25 * of so-called »events«, which refers to the data written between two flush() 26 * calls. 27 * 28 * Chunking is supported, events are guaranteed to never span chunk boundaries. 29 * As a consequence, an event can never be larger than the chunk size. The 30 * chunk size used is not saved with the file, so care has to be taken to make 31 * sure the same chunk size is used for reading and writing. 32 */ 33 module thrift.transport.file; 34 35 import core.thread : Thread; 36 import std.array : empty; 37 import std.algorithm : min, max; 38 import std.concurrency; 39 import std.conv : to; 40 import std.datetime : AutoStart, dur, Duration, StopWatch; 41 import std.exception; 42 import std.stdio : File; 43 import thrift.base; 44 import thrift.transport.base; 45 46 /// The default chunk size, in bytes. 47 enum DEFAULT_CHUNK_SIZE = 16 * 1024 * 1024; 48 49 /// The type used to represent event sizes in the file. 50 alias uint EventSize; 51 52 version (BigEndian) { 53 static assert(false, 54 "Little endian byte order is assumed in thrift.transport.file."); 55 } 56 57 /** 58 * A transport used to read log files. It can never be written to, calling 59 * write() throws. 60 * 61 * Contrary to the C++ design, explicitly opening the transport/file before 62 * using is necessary to allow manually closing the file without relying on the 63 * object lifetime. Otherwise, it's a straight port of the C++ implementation. 64 */ 65 final class TFileReaderTransport : TBaseTransport { 66 /** 67 * Creates a new file writer transport. 68 * 69 * Params: 70 * path = Path of the file to opperate on. 71 */ 72 this(string path) { 73 path_ = path; 74 chunkSize_ = DEFAULT_CHUNK_SIZE; 75 readBufferSize_ = DEFAULT_READ_BUFFER_SIZE; 76 readTimeout_ = DEFAULT_READ_TIMEOUT; 77 corruptedEventSleepDuration_ = DEFAULT_CORRUPTED_EVENT_SLEEP_DURATION; 78 maxEventSize = DEFAULT_MAX_EVENT_SIZE; 79 } 80 81 override bool isOpen() @property { 82 return isOpen_; 83 } 84 85 override bool peek() { 86 if (!isOpen) return false; 87 88 // If there is no event currently processed, try fetching one from the 89 // file. 90 if (!currentEvent_) { 91 currentEvent_ = readEvent(); 92 93 if (!currentEvent_) { 94 // Still nothing there, couldn't read a new event. 95 return false; 96 } 97 } 98 // check if there is anything to read 99 return (currentEvent_.length - currentEventPos_) > 0; 100 } 101 102 override void open() { 103 if (isOpen) return; 104 try { 105 file_ = File(path_, "rb"); 106 } catch (Exception e) { 107 throw new TTransportException("Error on opening input file.", 108 TTransportException.Type.NOT_OPEN, __FILE__, __LINE__, e); 109 } 110 isOpen_ = true; 111 } 112 113 override void close() { 114 if (!isOpen) return; 115 116 file_.close(); 117 isOpen_ = false; 118 readState_.resetAllValues(); 119 } 120 121 override size_t read(ubyte[] buf) { 122 enforce(isOpen, new TTransportException( 123 "Cannot read if file is not open.", TTransportException.Type.NOT_OPEN)); 124 125 // If there is no event currently processed, try fetching one from the 126 // file. 127 if (!currentEvent_) { 128 currentEvent_ = readEvent(); 129 130 if (!currentEvent_) { 131 // Still nothing there, couldn't read a new event. 132 return 0; 133 } 134 } 135 136 auto len = buf.length; 137 auto remaining = currentEvent_.length - currentEventPos_; 138 139 if (remaining <= len) { 140 // If less than the requested length is available, read as much as 141 // possible. 142 buf[0 .. remaining] = currentEvent_[currentEventPos_ .. $]; 143 currentEvent_ = null; 144 currentEventPos_ = 0; 145 return remaining; 146 } 147 148 // There will still be data left in the buffer after reading, pass out len 149 // bytes. 150 buf[] = currentEvent_[currentEventPos_ .. currentEventPos_ + len]; 151 currentEventPos_ += len; 152 return len; 153 } 154 155 ulong getNumChunks() { 156 enforce(isOpen, new TTransportException( 157 "Cannot get number of chunks if file not open.", 158 TTransportException.Type.NOT_OPEN)); 159 160 try { 161 auto fileSize = file_.size(); 162 if (fileSize == 0) { 163 // Empty files have no chunks. 164 return 0; 165 } 166 return ((fileSize)/chunkSize_) + 1; 167 } catch (Exception e) { 168 throw new TTransportException("Error getting file size.", __FILE__, 169 __LINE__, e); 170 } 171 } 172 173 ulong getCurChunk() { 174 return offset_ / chunkSize_; 175 } 176 177 void seekToChunk(long chunk) { 178 enforce(isOpen, new TTransportException( 179 "Cannot get number of chunks if file not open.", 180 TTransportException.Type.NOT_OPEN)); 181 182 auto numChunks = getNumChunks(); 183 184 if (chunk < 0) { 185 // Count negative indices from the end. 186 chunk += numChunks; 187 } 188 189 if (chunk < 0) { 190 logError("Incorrect chunk number for reverse seek, seeking to " ~ 191 "beginning instead: %s", chunk); 192 chunk = 0; 193 } 194 195 bool seekToEnd; 196 long minEndOffset; 197 if (chunk >= numChunks) { 198 logError("Trying to seek to non-existing chunk, seeking to " ~ 199 "end of file instead: %s", chunk); 200 seekToEnd = true; 201 chunk = numChunks - 1; 202 // this is the min offset to process events till 203 minEndOffset = file_.size(); 204 } 205 206 readState_.resetAllValues(); 207 currentEvent_ = null; 208 209 try { 210 file_.seek(chunk * chunkSize_); 211 offset_ = chunk * chunkSize_; 212 } catch (Exception e) { 213 throw new TTransportException("Error seeking to chunk", __FILE__, 214 __LINE__, e); 215 } 216 217 if (seekToEnd) { 218 // Never wait on the end of the file for new content, we just want to 219 // find the last one. 220 auto oldReadTimeout = readTimeout_; 221 scope (exit) readTimeout_ = oldReadTimeout; 222 readTimeout_ = dur!"hnsecs"(0); 223 224 // Keep on reading unti the last event at point of seekToChunk call. 225 while ((offset_ + readState_.bufferPos_) < minEndOffset) { 226 if (readEvent() is null) { 227 break; 228 } 229 } 230 } 231 } 232 233 void seekToEnd() { 234 seekToChunk(getNumChunks()); 235 } 236 237 /** 238 * The size of the chunks the file is divided into, in bytes. 239 */ 240 ulong chunkSize() @property const { 241 return chunkSize_; 242 } 243 244 /// ditto 245 void chunkSize(ulong value) @property { 246 enforce(!isOpen, new TTransportException( 247 "Cannot set chunk size after TFileReaderTransport has been opened.")); 248 enforce(value > EventSize.sizeof, new TTransportException("Chunks must " ~ 249 "be large enough to accomodate at least a single byte of payload data.")); 250 chunkSize_ = value; 251 } 252 253 /** 254 * If positive, wait the specified duration for new data when arriving at 255 * end of file. If negative, wait forever (tailing mode), waking up to check 256 * in the specified interval. If zero, do not wait at all. 257 * 258 * Defaults to 500 ms. 259 */ 260 Duration readTimeout() @property const { 261 return readTimeout_; 262 } 263 264 /// ditto 265 void readTimeout(Duration value) @property { 266 readTimeout_ = value; 267 } 268 269 /// ditto 270 enum DEFAULT_READ_TIMEOUT = dur!"msecs"(500); 271 272 /** 273 * Read buffer size, in bytes. 274 * 275 * Defaults to 1 MiB. 276 */ 277 size_t readBufferSize() @property const { 278 return readBufferSize_; 279 } 280 281 /// ditto 282 void readBufferSize(size_t value) @property { 283 if (readBuffer_) { 284 enforce(value <= readBufferSize_, 285 "Cannot shrink read buffer after first read."); 286 readBuffer_.length = value; 287 } 288 readBufferSize_ = value; 289 } 290 291 /// ditto 292 enum DEFAULT_READ_BUFFER_SIZE = 1 * 1024 * 1024; 293 294 /** 295 * Arbitrary event size limit, in bytes. Must be smaller than chunk size. 296 * 297 * Defaults to zero (no limit). 298 */ 299 size_t maxEventSize() @property const { 300 return maxEventSize_; 301 } 302 303 /// ditto 304 void maxEventSize(size_t value) @property { 305 enforce(value <= chunkSize_ - EventSize.sizeof, "Events cannot span " ~ 306 "mutiple chunks, maxEventSize must be smaller than chunk size."); 307 maxEventSize_ = value; 308 } 309 310 /// ditto 311 enum DEFAULT_MAX_EVENT_SIZE = 0; 312 313 /** 314 * The interval at which the thread wakes up to check for the next chunk 315 * in tailing mode. 316 * 317 * Defaults to one second. 318 */ 319 Duration corruptedEventSleepDuration() const { 320 return corruptedEventSleepDuration_; 321 } 322 323 /// ditto 324 void corruptedEventSleepDuration(Duration value) { 325 corruptedEventSleepDuration_ = value; 326 } 327 328 /// ditto 329 enum DEFAULT_CORRUPTED_EVENT_SLEEP_DURATION = dur!"seconds"(1); 330 331 /** 332 * The maximum number of corrupted events tolerated before the whole chunk 333 * is skipped. 334 * 335 * Defaults to zero. 336 */ 337 uint maxCorruptedEvents() @property const { 338 return maxCorruptedEvents_; 339 } 340 341 /// ditto 342 void maxCorruptedEvents(uint value) @property { 343 maxCorruptedEvents_ = value; 344 } 345 346 /// ditto 347 enum DEFAULT_MAX_CORRUPTED_EVENTS = 0; 348 349 private: 350 ubyte[] readEvent() { 351 if (!readBuffer_) { 352 readBuffer_ = new ubyte[readBufferSize_]; 353 } 354 355 bool timeoutExpired; 356 while (1) { 357 // read from the file if read buffer is exhausted 358 if (readState_.bufferPos_ == readState_.bufferLen_) { 359 // advance the offset pointer 360 offset_ += readState_.bufferLen_; 361 362 try { 363 // Need to clear eof flag before reading, otherwise tailing a file 364 // does not work. 365 file_.clearerr(); 366 367 auto usedBuf = file_.rawRead(readBuffer_); 368 readState_.bufferLen_ = usedBuf.length; 369 } catch (Exception e) { 370 readState_.resetAllValues(); 371 throw new TTransportException("Error while reading from file", 372 __FILE__, __LINE__, e); 373 } 374 375 readState_.bufferPos_ = 0; 376 readState_.lastDispatchPos_ = 0; 377 378 if (readState_.bufferLen_ == 0) { 379 // Reached end of file. 380 if (readTimeout_ < dur!"hnsecs"(0)) { 381 // Tailing mode, sleep for the specified duration and try again. 382 Thread.sleep(-readTimeout_); 383 continue; 384 } else if (readTimeout_ == dur!"hnsecs"(0) || timeoutExpired) { 385 // Either no timeout set, or it has already expired. 386 readState_.resetState(0); 387 return null; 388 } else { 389 // Timeout mode, sleep for the specified amount of time and retry. 390 Thread.sleep(readTimeout_); 391 timeoutExpired = true; 392 continue; 393 } 394 } 395 } 396 397 // Attempt to read an event from the buffer. 398 while (readState_.bufferPos_ < readState_.bufferLen_) { 399 if (readState_.readingSize_) { 400 if (readState_.eventSizeBuffPos_ == 0) { 401 if ((offset_ + readState_.bufferPos_)/chunkSize_ != 402 ((offset_ + readState_.bufferPos_ + 3)/chunkSize_)) 403 { 404 readState_.bufferPos_++; 405 continue; 406 } 407 } 408 409 readState_.eventSizeBuff_[readState_.eventSizeBuffPos_++] = 410 readBuffer_[readState_.bufferPos_++]; 411 412 if (readState_.eventSizeBuffPos_ == 4) { 413 auto size = (cast(uint[])readState_.eventSizeBuff_)[0]; 414 415 if (size == 0) { 416 // This is part of the zero padding between chunks. 417 readState_.resetState(readState_.lastDispatchPos_); 418 continue; 419 } 420 421 // got a valid event 422 readState_.readingSize_ = false; 423 readState_.eventLen_ = size; 424 readState_.eventPos_ = 0; 425 426 // check if the event is corrupted and perform recovery if required 427 if (isEventCorrupted()) { 428 performRecovery(); 429 // start from the top 430 break; 431 } 432 } 433 } else { 434 if (!readState_.event_) { 435 readState_.event_ = new ubyte[readState_.eventLen_]; 436 } 437 438 // take either the entire event or the remaining bytes in the buffer 439 auto reclaimBuffer = min(readState_.bufferLen_ - readState_.bufferPos_, 440 readState_.eventLen_ - readState_.eventPos_); 441 442 // copy data from read buffer into event buffer 443 readState_.event_[ 444 readState_.eventPos_ .. readState_.eventPos_ + reclaimBuffer 445 ] = readBuffer_[ 446 readState_.bufferPos_ .. readState_.bufferPos_ + reclaimBuffer 447 ]; 448 449 // increment position ptrs 450 readState_.eventPos_ += reclaimBuffer; 451 readState_.bufferPos_ += reclaimBuffer; 452 453 // check if the event has been read in full 454 if (readState_.eventPos_ == readState_.eventLen_) { 455 // Reset the read state and return the completed event. 456 auto completeEvent = readState_.event_; 457 readState_.event_ = null; 458 readState_.resetState(readState_.bufferPos_); 459 return completeEvent; 460 } 461 } 462 } 463 } 464 } 465 466 bool isEventCorrupted() { 467 if ((maxEventSize_ > 0) && (readState_.eventLen_ > maxEventSize_)) { 468 // Event size is larger than user-speficied max-event size 469 logError("Corrupt event read: Event size (%s) greater than max " ~ 470 "event size (%s)", readState_.eventLen_, maxEventSize_); 471 return true; 472 } else if (readState_.eventLen_ > chunkSize_) { 473 // Event size is larger than chunk size 474 logError("Corrupt event read: Event size (%s) greater than chunk " ~ 475 "size (%s)", readState_.eventLen_, chunkSize_); 476 return true; 477 } else if (((offset_ + readState_.bufferPos_ - EventSize.sizeof) / chunkSize_) != 478 ((offset_ + readState_.bufferPos_ + readState_.eventLen_ - EventSize.sizeof) / chunkSize_)) 479 { 480 // Size indicates that event crosses chunk boundary 481 logError("Read corrupt event. Event crosses chunk boundary. " ~ 482 "Event size: %s. Offset: %s", readState_.eventLen_, 483 (offset_ + readState_.bufferPos_ + EventSize.sizeof) 484 ); 485 486 return true; 487 } 488 489 return false; 490 } 491 492 void performRecovery() { 493 // perform some kickass recovery 494 auto curChunk = getCurChunk(); 495 if (lastBadChunk_ == curChunk) { 496 numCorruptedEventsInChunk_++; 497 } else { 498 lastBadChunk_ = curChunk; 499 numCorruptedEventsInChunk_ = 1; 500 } 501 502 if (numCorruptedEventsInChunk_ < maxCorruptedEvents_) { 503 // maybe there was an error in reading the file from disk 504 // seek to the beginning of chunk and try again 505 seekToChunk(curChunk); 506 } else { 507 // Just skip ahead to the next chunk if we not already at the last chunk. 508 if (curChunk != (getNumChunks() - 1)) { 509 seekToChunk(curChunk + 1); 510 } else if (readTimeout_ < dur!"hnsecs"(0)) { 511 // We are in tailing mode, wait until there is enough data to start 512 // the next chunk. 513 while(curChunk == (getNumChunks() - 1)) { 514 Thread.sleep(corruptedEventSleepDuration_); 515 } 516 seekToChunk(curChunk + 1); 517 } else { 518 // Pretty hosed at this stage, rewind the file back to the last 519 // successful point and punt on the error. 520 readState_.resetState(readState_.lastDispatchPos_); 521 currentEvent_ = null; 522 currentEventPos_ = 0; 523 524 throw new TTransportException("File corrupted at offset: " ~ 525 to!string(offset_ + readState_.lastDispatchPos_), 526 TTransportException.Type.CORRUPTED_DATA); 527 } 528 } 529 } 530 531 string path_; 532 File file_; 533 bool isOpen_; 534 long offset_; 535 ubyte[] currentEvent_; 536 size_t currentEventPos_; 537 ulong chunkSize_; 538 Duration readTimeout_; 539 size_t maxEventSize_; 540 541 // Read buffer – lazily allocated on the first read(). 542 ubyte[] readBuffer_; 543 size_t readBufferSize_; 544 545 static struct ReadState { 546 ubyte[] event_; 547 size_t eventLen_; 548 size_t eventPos_; 549 550 // keep track of event size 551 ubyte[4] eventSizeBuff_; 552 ubyte eventSizeBuffPos_; 553 bool readingSize_ = true; 554 555 // read buffer variables 556 size_t bufferPos_; 557 size_t bufferLen_; 558 559 // last successful dispatch point 560 size_t lastDispatchPos_; 561 562 void resetState(size_t lastDispatchPos) { 563 readingSize_ = true; 564 eventSizeBuffPos_ = 0; 565 lastDispatchPos_ = lastDispatchPos; 566 } 567 568 void resetAllValues() { 569 resetState(0); 570 bufferPos_ = 0; 571 bufferLen_ = 0; 572 event_ = null; 573 } 574 } 575 ReadState readState_; 576 577 ulong lastBadChunk_; 578 uint maxCorruptedEvents_; 579 uint numCorruptedEventsInChunk_; 580 Duration corruptedEventSleepDuration_; 581 } 582 583 /** 584 * A transport used to write log files. It can never be read from, calling 585 * read() throws. 586 * 587 * Contrary to the C++ design, explicitly opening the transport/file before 588 * using is necessary to allow manually closing the file without relying on the 589 * object lifetime. 590 */ 591 final class TFileWriterTransport : TBaseTransport { 592 /** 593 * Creates a new file writer transport. 594 * 595 * Params: 596 * path = Path of the file to opperate on. 597 */ 598 this(string path) { 599 path_ = path; 600 601 chunkSize_ = DEFAULT_CHUNK_SIZE; 602 eventBufferSize_ = DEFAULT_EVENT_BUFFER_SIZE; 603 ioErrorSleepDuration = DEFAULT_IO_ERROR_SLEEP_DURATION; 604 maxFlushBytes_ = DEFAULT_MAX_FLUSH_BYTES; 605 maxFlushInterval_ = DEFAULT_MAX_FLUSH_INTERVAL; 606 } 607 608 override bool isOpen() @property { 609 return isOpen_; 610 } 611 612 /** 613 * A file writer transport can never be read from. 614 */ 615 override bool peek() { 616 return false; 617 } 618 619 override void open() { 620 if (isOpen) return; 621 622 writerThread_ = spawn( 623 &writerThread, 624 path_, 625 chunkSize_, 626 maxFlushBytes_, 627 maxFlushInterval_, 628 ioErrorSleepDuration_ 629 ); 630 setMaxMailboxSize(writerThread_, eventBufferSize_, OnCrowding.block); 631 isOpen_ = true; 632 } 633 634 /** 635 * Closes the transport, i.e. the underlying file and the writer thread. 636 */ 637 override void close() { 638 if (!isOpen) return; 639 640 prioritySend(writerThread_, ShutdownMessage(), thisTid); // FIXME: Should use normal send here. 641 receive((ShutdownMessage msg, Tid tid){}); 642 isOpen_ = false; 643 } 644 645 /** 646 * Enqueues the passed slice of data for writing and immediately returns. 647 * write() only blocks if the event buffer has been exhausted. 648 * 649 * The transport must be open when calling this. 650 * 651 * Params: 652 * buf = Slice of data to write. 653 */ 654 override void write(in ubyte[] buf) { 655 enforce(isOpen, new TTransportException( 656 "Cannot write to non-open file.", TTransportException.Type.NOT_OPEN)); 657 658 if (buf.empty) { 659 logError("Cannot write empty event, skipping."); 660 return; 661 } 662 663 auto maxSize = chunkSize - EventSize.sizeof; 664 enforce(buf.length <= maxSize, new TTransportException( 665 "Cannot write more than " ~ to!string(maxSize) ~ 666 "bytes at once due to chunk size.")); 667 668 send(writerThread_, buf.idup); 669 } 670 671 /** 672 * Flushes any pending data to be written. 673 * 674 * The transport must be open when calling this. 675 * 676 * Throws: TTransportException if an error occurs. 677 */ 678 override void flush() { 679 enforce(isOpen, new TTransportException( 680 "Cannot flush file if not open.", TTransportException.Type.NOT_OPEN)); 681 682 send(writerThread_, FlushMessage(), thisTid); 683 receive((FlushMessage msg, Tid tid){}); 684 } 685 686 /** 687 * The size of the chunks the file is divided into, in bytes. 688 * 689 * A single event (write call) never spans multiple chunks – this 690 * effectively limits the event size to chunkSize - EventSize.sizeof. 691 */ 692 ulong chunkSize() @property { 693 return chunkSize_; 694 } 695 696 /// ditto 697 void chunkSize(ulong value) @property { 698 enforce(!isOpen, new TTransportException( 699 "Cannot set chunk size after TFileWriterTransport has been opened.")); 700 chunkSize_ = value; 701 } 702 703 /** 704 * The maximum number of write() calls buffered, or zero for no limit. 705 * 706 * If the buffer is exhausted, write() will block until space becomes 707 * available. 708 */ 709 size_t eventBufferSize() @property { 710 return eventBufferSize_; 711 } 712 713 /// ditto 714 void eventBufferSize(size_t value) @property { 715 eventBufferSize_ = value; 716 if (isOpen) { 717 setMaxMailboxSize(writerThread_, value, OnCrowding.throwException); 718 } 719 } 720 721 /// ditto 722 enum DEFAULT_EVENT_BUFFER_SIZE = 10_000; 723 724 /** 725 * Maximum number of bytes buffered before writing and flushing the file 726 * to disk. 727 * 728 * Currently cannot be set after the first call to write(). 729 */ 730 size_t maxFlushBytes() @property { 731 return maxFlushBytes_; 732 } 733 734 /// ditto 735 void maxFlushBytes(size_t value) @property { 736 maxFlushBytes_ = value; 737 if (isOpen) { 738 send(writerThread_, FlushBytesMessage(value)); 739 } 740 } 741 742 /// ditto 743 enum DEFAULT_MAX_FLUSH_BYTES = 1000 * 1024; 744 745 /** 746 * Maximum interval between flushing the file to disk. 747 * 748 * Currenlty cannot be set after the first call to write(). 749 */ 750 Duration maxFlushInterval() @property { 751 return maxFlushInterval_; 752 } 753 754 /// ditto 755 void maxFlushInterval(Duration value) @property { 756 maxFlushInterval_ = value; 757 if (isOpen) { 758 send(writerThread_, FlushIntervalMessage(value)); 759 } 760 } 761 762 /// ditto 763 enum DEFAULT_MAX_FLUSH_INTERVAL = dur!"seconds"(3); 764 765 /** 766 * When the writer thread encounteres an I/O error, it goes pauses for a 767 * short time before trying to reopen the output file. This controls the 768 * sleep duration. 769 */ 770 Duration ioErrorSleepDuration() @property { 771 return ioErrorSleepDuration_; 772 } 773 774 /// ditto 775 void ioErrorSleepDuration(Duration value) @property { 776 ioErrorSleepDuration_ = value; 777 if (isOpen) { 778 send(writerThread_, FlushIntervalMessage(value)); 779 } 780 } 781 782 /// ditto 783 enum DEFAULT_IO_ERROR_SLEEP_DURATION = dur!"msecs"(500); 784 785 private: 786 string path_; 787 ulong chunkSize_; 788 size_t eventBufferSize_; 789 Duration ioErrorSleepDuration_; 790 size_t maxFlushBytes_; 791 Duration maxFlushInterval_; 792 bool isOpen_; 793 Tid writerThread_; 794 } 795 796 private { 797 // Signals that the file should be flushed on disk. Sent to the writer 798 // thread and sent back along with the tid for confirmation. 799 struct FlushMessage {} 800 801 // Signals that the writer thread should close the file and shut down. Sent 802 // to the writer thread and sent back along with the tid for confirmation. 803 struct ShutdownMessage {} 804 805 struct FlushBytesMessage { 806 size_t value; 807 } 808 809 struct FlushIntervalMessage { 810 Duration value; 811 } 812 813 struct IoErrorSleepDurationMessage { 814 Duration value; 815 } 816 817 void writerThread( 818 string path, 819 ulong chunkSize, 820 size_t maxFlushBytes, 821 Duration maxFlushInterval, 822 Duration ioErrorSleepDuration 823 ) { 824 bool errorOpening; 825 File file; 826 ulong offset; 827 try { 828 // Open file in appending and binary mode. 829 file = File(path, "ab"); 830 offset = file.tell(); 831 } catch (Exception e) { 832 logError("Error on opening output file in writer thread: %s", e); 833 errorOpening = true; 834 } 835 836 auto flushTimer = StopWatch(AutoStart.yes); 837 size_t unflushedByteCount; 838 839 Tid shutdownRequestTid; 840 bool shutdownRequested; 841 while (true) { 842 if (shutdownRequested) break; 843 844 bool forceFlush; 845 Tid flushRequestTid; 846 receiveTimeout(max(dur!"hnsecs"(0), maxFlushInterval - flushTimer.peek()), 847 (immutable(ubyte)[] data) { 848 while (errorOpening) { 849 logError("Writer thread going to sleep for %s µs due to IO errors", 850 ioErrorSleepDuration.fracSec.usecs); 851 852 // Sleep for ioErrorSleepDuration, being ready to be interrupted 853 // by shutdown requests. 854 auto timedOut = receiveTimeout(ioErrorSleepDuration, 855 (ShutdownMessage msg, Tid tid){ shutdownRequestTid = tid; }); 856 if (!timedOut) { 857 // We got a shutdown request, just drop all events and exit the 858 // main loop as to not block application shutdown with our tries 859 // which we must assume to fail. 860 break; 861 } 862 863 try { 864 file = File(path, "ab"); 865 unflushedByteCount = 0; 866 errorOpening = false; 867 logError("Output file %s reopened during writer thread error " ~ 868 "recovery", path); 869 } catch (Exception e) { 870 logError("Unable to reopen output file %s during writer " ~ 871 "thread error recovery", path); 872 } 873 } 874 875 // Make sure the event does not cross the chunk boundary by writing 876 // a padding consisting of zeroes if it would. 877 auto chunk1 = offset / chunkSize; 878 auto chunk2 = (offset + EventSize.sizeof + data.length - 1) / chunkSize; 879 880 if (chunk1 != chunk2) { 881 // TODO: The C++ implementation refetches the offset here to »keep 882 // in sync« – why would this be needed? 883 auto padding = cast(size_t) 884 ((((offset / chunkSize) + 1) * chunkSize) - offset); 885 auto zeroes = new ubyte[padding]; 886 file.rawWrite(zeroes); 887 unflushedByteCount += padding; 888 offset += padding; 889 } 890 891 // TODO: 2 syscalls here, is this a problem performance-wise? 892 // Probably abysmal performance on Windows due to rawWrite 893 // implementation. 894 uint len = cast(uint)data.length; 895 file.rawWrite(cast(ubyte[])(&len)[0..1]); 896 file.rawWrite(data); 897 898 auto bytesWritten = EventSize.sizeof + data.length; 899 unflushedByteCount += bytesWritten; 900 offset += bytesWritten; 901 }, (FlushBytesMessage msg) { 902 maxFlushBytes = msg.value; 903 }, (FlushIntervalMessage msg) { 904 maxFlushInterval = msg.value; 905 }, (IoErrorSleepDurationMessage msg) { 906 ioErrorSleepDuration = msg.value; 907 }, (FlushMessage msg, Tid tid) { 908 forceFlush = true; 909 flushRequestTid = tid; 910 }, (OwnerTerminated msg) { 911 shutdownRequested = true; 912 }, (ShutdownMessage msg, Tid tid) { 913 shutdownRequested = true; 914 shutdownRequestTid = tid; 915 } 916 ); 917 918 if (errorOpening) continue; 919 920 bool flush; 921 if (forceFlush || shutdownRequested || unflushedByteCount > maxFlushBytes) { 922 flush = true; 923 } else if (cast(Duration)flushTimer.peek() > maxFlushInterval) { 924 if (unflushedByteCount == 0) { 925 // If the flush timer is due, but no data has been written, don't 926 // needlessly fsync, but do reset the timer. 927 flushTimer.reset(); 928 } else { 929 flush = true; 930 } 931 } 932 933 if (flush) { 934 file.flush(); 935 flushTimer.reset(); 936 unflushedByteCount = 0; 937 if (forceFlush) send(flushRequestTid, FlushMessage(), thisTid); 938 } 939 } 940 941 file.close(); 942 943 if (shutdownRequestTid != Tid.init) { 944 send(shutdownRequestTid, ShutdownMessage(), thisTid); 945 } 946 } 947 } 948 949 version (unittest) { 950 import core.memory : GC; 951 import std.file; 952 } 953 954 unittest { 955 void tryRemove(string fileName) { 956 try { 957 remove(fileName); 958 } catch (Exception) {} 959 } 960 961 immutable fileName = "unittest.dat.tmp"; 962 enforce(!exists(fileName), "Unit test output file " ~ fileName ~ 963 " already exists."); 964 965 /* 966 * Check the most basic reading/writing operations. 967 */ 968 { 969 scope (exit) tryRemove(fileName); 970 971 auto writer = new TFileWriterTransport(fileName); 972 writer.open(); 973 scope (exit) writer.close(); 974 975 writer.write([1, 2]); 976 writer.write([3, 4]); 977 writer.write([5, 6, 7]); 978 writer.flush(); 979 980 auto reader = new TFileReaderTransport(fileName); 981 reader.open(); 982 scope (exit) reader.close(); 983 984 auto buf = new ubyte[7]; 985 reader.readAll(buf); 986 enforce(buf == [1, 2, 3, 4, 5, 6, 7]); 987 } 988 989 /* 990 * Check that chunking works as expected. 991 */ 992 { 993 scope (exit) tryRemove(fileName); 994 995 static assert(EventSize.sizeof == 4); 996 enum CHUNK_SIZE = 10; 997 998 // Write some contents to the file. 999 { 1000 auto writer = new TFileWriterTransport(fileName); 1001 writer.chunkSize = CHUNK_SIZE; 1002 writer.open(); 1003 scope (exit) writer.close(); 1004 1005 writer.write([0xde]); 1006 writer.write([0xad]); 1007 // Chunk boundary here. 1008 writer.write([0xbe]); 1009 // The next write doesn't fit in the five bytes remaining, so we expect 1010 // padding zero bytes to be written. 1011 writer.write([0xef, 0x12]); 1012 1013 try { 1014 writer.write(new ubyte[CHUNK_SIZE]); 1015 enforce(false, "Could write event not fitting in a single chunk."); 1016 } catch (TTransportException e) {} 1017 1018 writer.flush(); 1019 } 1020 1021 // Check the raw contents of the file to see if chunk padding was written 1022 // as expected. 1023 auto file = File(fileName, "r"); 1024 enforce(file.size == 26); 1025 auto written = new ubyte[26]; 1026 file.rawRead(written); 1027 enforce(written == [ 1028 1, 0, 0, 0, 0xde, 1029 1, 0, 0, 0, 0xad, 1030 1, 0, 0, 0, 0xbe, 1031 0, 0, 0, 0, 0, 1032 2, 0, 0, 0, 0xef, 0x12 1033 ]); 1034 1035 // Read the data back in, getting all the events at once. 1036 { 1037 auto reader = new TFileReaderTransport(fileName); 1038 reader.chunkSize = CHUNK_SIZE; 1039 reader.open(); 1040 scope (exit) reader.close(); 1041 1042 auto buf = new ubyte[5]; 1043 reader.readAll(buf); 1044 enforce(buf == [0xde, 0xad, 0xbe, 0xef, 0x12]); 1045 } 1046 } 1047 1048 /* 1049 * Make sure that close() exits "quickly", i.e. that there is no problem 1050 * with the worker thread waking up. 1051 */ 1052 { 1053 import std.conv : text; 1054 enum NUM_ITERATIONS = 1000; 1055 1056 uint numOver = 0; 1057 foreach (n; 0 .. NUM_ITERATIONS) { 1058 scope (exit) tryRemove(fileName); 1059 1060 auto transport = new TFileWriterTransport(fileName); 1061 transport.open(); 1062 1063 // Write something so that the writer thread gets started. 1064 transport.write(cast(ubyte[])"foo"); 1065 1066 // Every other iteration, also call flush(), just in case that potentially 1067 // has any effect on how the writer thread wakes up. 1068 if (n & 0x1) { 1069 transport.flush(); 1070 } 1071 1072 // Time the call to close(). 1073 auto sw = StopWatch(AutoStart.yes); 1074 transport.close(); 1075 sw.stop(); 1076 1077 // If any attempt takes more than 500ms, treat that as a fatal failure to 1078 // avoid looping over a potentially very slow operation. 1079 enforce(sw.peek().msecs < 500, 1080 text("close() took ", sw.peek().msecs, "ms.")); 1081 1082 // Normally, it takes less than 5ms on my dev box. 1083 // However, if the box is heavily loaded, some of the test runs can take 1084 // longer. Additionally, on a Windows Server 2008 instance running in 1085 // a VirtualBox VM, it has been observed that about a quarter of the runs 1086 // takes (217 ± 1) ms, for reasons not yet known. 1087 if (sw.peek().msecs > 5) { 1088 ++numOver; 1089 } 1090 1091 // Force garbage collection runs every now and then to make sure we 1092 // don't run out of OS thread handles. 1093 if (!(n % 100)) GC.collect(); 1094 } 1095 1096 // Make sure fewer than a third of the runs took longer than 5ms. 1097 enforce(numOver < NUM_ITERATIONS / 3, 1098 text(numOver, " iterations took more than 10 ms.")); 1099 } 1100 }