1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 /**
21  * Transports for reading from/writing to Thrift »log files«.
22  *
23  * These transports are not »stupid« sources and sinks just reading and
24  * writing bytes from a file verbatim, but organize the contents in the form
25  * of so-called »events«, which refers to the data written between two flush()
26  * calls.
27  *
28  * Chunking is supported, events are guaranteed to never span chunk boundaries.
29  * As a consequence, an event can never be larger than the chunk size. The
30  * chunk size used is not saved with the file, so care has to be taken to make
31  * sure the same chunk size is used for reading and writing.
32  */
33 module thrift.transport.file;
34 
35 import core.thread : Thread;
36 import std.array : empty;
37 import std.algorithm : min, max;
38 import std.concurrency;
39 import std.conv : to;
40 import std.datetime : AutoStart, dur, Duration, StopWatch;
41 import std.exception;
42 import std.stdio : File;
43 import thrift.base;
44 import thrift.transport.base;
45 
46 /// The default chunk size, in bytes.
47 enum DEFAULT_CHUNK_SIZE = 16 * 1024 * 1024;
48 
49 /// The type used to represent event sizes in the file.
50 alias uint EventSize;
51 
52 version (BigEndian) {
53   static assert(false,
54     "Little endian byte order is assumed in thrift.transport.file.");
55 }
56 
57 /**
58  * A transport used to read log files. It can never be written to, calling
59  * write() throws.
60  *
61  * Contrary to the C++ design, explicitly opening the transport/file before
62  * using is necessary to allow manually closing the file without relying on the
63  * object lifetime. Otherwise, it's a straight port of the C++ implementation.
64  */
65 final class TFileReaderTransport : TBaseTransport {
66   /**
67    * Creates a new file writer transport.
68    *
69    * Params:
70    *   path = Path of the file to opperate on.
71    */
72   this(string path) {
73     path_ = path;
74     chunkSize_ = DEFAULT_CHUNK_SIZE;
75     readBufferSize_ = DEFAULT_READ_BUFFER_SIZE;
76     readTimeout_ = DEFAULT_READ_TIMEOUT;
77     corruptedEventSleepDuration_ = DEFAULT_CORRUPTED_EVENT_SLEEP_DURATION;
78     maxEventSize = DEFAULT_MAX_EVENT_SIZE;
79   }
80 
81   override bool isOpen() @property {
82     return isOpen_;
83   }
84 
85   override bool peek() {
86     if (!isOpen) return false;
87 
88     // If there is no event currently processed, try fetching one from the
89     // file.
90     if (!currentEvent_) {
91       currentEvent_ = readEvent();
92 
93       if (!currentEvent_) {
94         // Still nothing there, couldn't read a new event.
95         return false;
96       }
97     }
98     // check if there is anything to read
99     return (currentEvent_.length - currentEventPos_) > 0;
100   }
101 
102   override void open() {
103     if (isOpen) return;
104     try {
105       file_ = File(path_, "rb");
106     } catch (Exception e) {
107       throw new TTransportException("Error on opening input file.",
108         TTransportException.Type.NOT_OPEN, __FILE__, __LINE__, e);
109     }
110     isOpen_ = true;
111   }
112 
113   override void close() {
114     if (!isOpen) return;
115 
116     file_.close();
117     isOpen_ = false;
118     readState_.resetAllValues();
119   }
120 
121   override size_t read(ubyte[] buf) {
122     enforce(isOpen, new TTransportException(
123       "Cannot read if file is not open.", TTransportException.Type.NOT_OPEN));
124 
125     // If there is no event currently processed, try fetching one from the
126     // file.
127     if (!currentEvent_) {
128       currentEvent_ = readEvent();
129 
130       if (!currentEvent_) {
131         // Still nothing there, couldn't read a new event.
132         return 0;
133       }
134     }
135 
136     auto len = buf.length;
137     auto remaining = currentEvent_.length - currentEventPos_;
138 
139     if (remaining <= len) {
140       // If less than the requested length is available, read as much as
141       // possible.
142       buf[0 .. remaining] = currentEvent_[currentEventPos_ .. $];
143       currentEvent_ = null;
144       currentEventPos_ = 0;
145       return remaining;
146     }
147 
148     // There will still be data left in the buffer after reading, pass out len
149     // bytes.
150     buf[] = currentEvent_[currentEventPos_ .. currentEventPos_ + len];
151     currentEventPos_ += len;
152     return len;
153   }
154 
155   ulong getNumChunks() {
156     enforce(isOpen, new TTransportException(
157       "Cannot get number of chunks if file not open.",
158       TTransportException.Type.NOT_OPEN));
159 
160     try {
161       auto fileSize = file_.size();
162       if (fileSize == 0) {
163         // Empty files have no chunks.
164         return 0;
165       }
166       return ((fileSize)/chunkSize_) + 1;
167     } catch (Exception e) {
168       throw new TTransportException("Error getting file size.", __FILE__,
169         __LINE__, e);
170     }
171   }
172 
173   ulong getCurChunk() {
174     return offset_ / chunkSize_;
175   }
176 
177   void seekToChunk(long chunk) {
178     enforce(isOpen, new TTransportException(
179       "Cannot get number of chunks if file not open.",
180       TTransportException.Type.NOT_OPEN));
181 
182     auto numChunks = getNumChunks();
183 
184     if (chunk < 0) {
185       // Count negative indices from the end.
186       chunk += numChunks;
187     }
188 
189     if (chunk < 0) {
190       logError("Incorrect chunk number for reverse seek, seeking to " ~
191        "beginning instead: %s", chunk);
192       chunk = 0;
193     }
194 
195     bool seekToEnd;
196     long minEndOffset;
197     if (chunk >= numChunks) {
198       logError("Trying to seek to non-existing chunk, seeking to " ~
199        "end of file instead: %s", chunk);
200       seekToEnd = true;
201       chunk = numChunks - 1;
202       // this is the min offset to process events till
203       minEndOffset = file_.size();
204     }
205 
206     readState_.resetAllValues();
207     currentEvent_ = null;
208 
209     try {
210       file_.seek(chunk * chunkSize_);
211       offset_ = chunk * chunkSize_;
212     } catch (Exception e) {
213       throw new TTransportException("Error seeking to chunk", __FILE__,
214         __LINE__, e);
215     }
216 
217     if (seekToEnd) {
218       // Never wait on the end of the file for new content, we just want to
219       // find the last one.
220       auto oldReadTimeout = readTimeout_;
221       scope (exit) readTimeout_ = oldReadTimeout;
222       readTimeout_ = dur!"hnsecs"(0);
223 
224       // Keep on reading unti the last event at point of seekToChunk call.
225       while ((offset_ + readState_.bufferPos_) < minEndOffset) {
226         if (readEvent() is null) {
227           break;
228         }
229       }
230     }
231   }
232 
233   void seekToEnd() {
234     seekToChunk(getNumChunks());
235   }
236 
237   /**
238    * The size of the chunks the file is divided into, in bytes.
239    */
240   ulong chunkSize() @property const {
241     return chunkSize_;
242   }
243 
244   /// ditto
245   void chunkSize(ulong value) @property {
246     enforce(!isOpen, new TTransportException(
247       "Cannot set chunk size after TFileReaderTransport has been opened."));
248     enforce(value > EventSize.sizeof, new TTransportException("Chunks must " ~
249       "be large enough to accomodate at least a single byte of payload data."));
250     chunkSize_ = value;
251   }
252 
253   /**
254    * If positive, wait the specified duration for new data when arriving at
255    * end of file. If negative, wait forever (tailing mode), waking up to check
256    * in the specified interval. If zero, do not wait at all.
257    *
258    * Defaults to 500 ms.
259    */
260   Duration readTimeout() @property const {
261     return readTimeout_;
262   }
263 
264   /// ditto
265   void readTimeout(Duration value) @property {
266     readTimeout_ = value;
267   }
268 
269   /// ditto
270   enum DEFAULT_READ_TIMEOUT = dur!"msecs"(500);
271 
272   /**
273    * Read buffer size, in bytes.
274    *
275    * Defaults to 1 MiB.
276    */
277   size_t readBufferSize() @property const {
278     return readBufferSize_;
279   }
280 
281   /// ditto
282   void readBufferSize(size_t value) @property {
283     if (readBuffer_) {
284       enforce(value <= readBufferSize_,
285         "Cannot shrink read buffer after first read.");
286       readBuffer_.length = value;
287     }
288     readBufferSize_ = value;
289   }
290 
291   /// ditto
292   enum DEFAULT_READ_BUFFER_SIZE = 1 * 1024 * 1024;
293 
294   /**
295    * Arbitrary event size limit, in bytes. Must be smaller than chunk size.
296    *
297    * Defaults to zero (no limit).
298    */
299   size_t maxEventSize() @property const {
300     return maxEventSize_;
301   }
302 
303   /// ditto
304   void maxEventSize(size_t value) @property {
305     enforce(value <= chunkSize_ - EventSize.sizeof, "Events cannot span " ~
306       "mutiple chunks, maxEventSize must be smaller than chunk size.");
307     maxEventSize_ = value;
308   }
309 
310   /// ditto
311   enum DEFAULT_MAX_EVENT_SIZE = 0;
312 
313   /**
314    * The interval at which the thread wakes up to check for the next chunk
315    * in tailing mode.
316    *
317    * Defaults to one second.
318    */
319   Duration corruptedEventSleepDuration() const {
320     return corruptedEventSleepDuration_;
321   }
322 
323   /// ditto
324   void corruptedEventSleepDuration(Duration value) {
325     corruptedEventSleepDuration_ = value;
326   }
327 
328   /// ditto
329   enum DEFAULT_CORRUPTED_EVENT_SLEEP_DURATION = dur!"seconds"(1);
330 
331   /**
332    * The maximum number of corrupted events tolerated before the whole chunk
333    * is skipped.
334    *
335    * Defaults to zero.
336    */
337   uint maxCorruptedEvents() @property const {
338     return maxCorruptedEvents_;
339   }
340 
341   /// ditto
342   void maxCorruptedEvents(uint value) @property {
343     maxCorruptedEvents_ = value;
344   }
345 
346   /// ditto
347   enum DEFAULT_MAX_CORRUPTED_EVENTS = 0;
348 
349 private:
350   ubyte[] readEvent() {
351     if (!readBuffer_) {
352       readBuffer_ = new ubyte[readBufferSize_];
353     }
354 
355     bool timeoutExpired;
356     while (1) {
357       // read from the file if read buffer is exhausted
358       if (readState_.bufferPos_ == readState_.bufferLen_) {
359         // advance the offset pointer
360         offset_ += readState_.bufferLen_;
361 
362         try {
363           // Need to clear eof flag before reading, otherwise tailing a file
364           // does not work.
365           file_.clearerr();
366 
367           auto usedBuf = file_.rawRead(readBuffer_);
368           readState_.bufferLen_ = usedBuf.length;
369         } catch (Exception e) {
370           readState_.resetAllValues();
371           throw new TTransportException("Error while reading from file",
372             __FILE__, __LINE__, e);
373         }
374 
375         readState_.bufferPos_ = 0;
376         readState_.lastDispatchPos_ = 0;
377 
378         if (readState_.bufferLen_ == 0) {
379           // Reached end of file.
380           if (readTimeout_ < dur!"hnsecs"(0)) {
381             // Tailing mode, sleep for the specified duration and try again.
382             Thread.sleep(-readTimeout_);
383             continue;
384           } else if (readTimeout_ == dur!"hnsecs"(0) || timeoutExpired) {
385             // Either no timeout set, or it has already expired.
386             readState_.resetState(0);
387             return null;
388           } else {
389             // Timeout mode, sleep for the specified amount of time and retry.
390             Thread.sleep(readTimeout_);
391             timeoutExpired = true;
392             continue;
393           }
394         }
395       }
396 
397       // Attempt to read an event from the buffer.
398       while (readState_.bufferPos_ < readState_.bufferLen_) {
399         if (readState_.readingSize_) {
400           if (readState_.eventSizeBuffPos_ == 0) {
401             if ((offset_ + readState_.bufferPos_)/chunkSize_ !=
402               ((offset_ + readState_.bufferPos_ + 3)/chunkSize_))
403             {
404               readState_.bufferPos_++;
405               continue;
406             }
407           }
408 
409           readState_.eventSizeBuff_[readState_.eventSizeBuffPos_++] =
410             readBuffer_[readState_.bufferPos_++];
411 
412           if (readState_.eventSizeBuffPos_ == 4) {
413             auto size = (cast(uint[])readState_.eventSizeBuff_)[0];
414 
415             if (size == 0) {
416               // This is part of the zero padding between chunks.
417               readState_.resetState(readState_.lastDispatchPos_);
418               continue;
419             }
420 
421             // got a valid event
422             readState_.readingSize_ = false;
423             readState_.eventLen_ = size;
424             readState_.eventPos_ = 0;
425 
426             // check if the event is corrupted and perform recovery if required
427             if (isEventCorrupted()) {
428               performRecovery();
429               // start from the top
430               break;
431             }
432           }
433         } else {
434           if (!readState_.event_) {
435             readState_.event_ = new ubyte[readState_.eventLen_];
436           }
437 
438           // take either the entire event or the remaining bytes in the buffer
439           auto reclaimBuffer = min(readState_.bufferLen_ - readState_.bufferPos_,
440             readState_.eventLen_ - readState_.eventPos_);
441 
442           // copy data from read buffer into event buffer
443           readState_.event_[
444             readState_.eventPos_ .. readState_.eventPos_ + reclaimBuffer
445           ] = readBuffer_[
446             readState_.bufferPos_ .. readState_.bufferPos_ + reclaimBuffer
447           ];
448 
449           // increment position ptrs
450           readState_.eventPos_ += reclaimBuffer;
451           readState_.bufferPos_ += reclaimBuffer;
452 
453           // check if the event has been read in full
454           if (readState_.eventPos_ == readState_.eventLen_) {
455             // Reset the read state and return the completed event.
456             auto completeEvent = readState_.event_;
457             readState_.event_ = null;
458             readState_.resetState(readState_.bufferPos_);
459             return completeEvent;
460           }
461         }
462       }
463     }
464   }
465 
466   bool isEventCorrupted() {
467     if ((maxEventSize_ > 0) && (readState_.eventLen_ > maxEventSize_)) {
468       // Event size is larger than user-speficied max-event size
469       logError("Corrupt event read: Event size (%s) greater than max " ~
470         "event size (%s)", readState_.eventLen_, maxEventSize_);
471       return true;
472     } else if (readState_.eventLen_ > chunkSize_) {
473       // Event size is larger than chunk size
474       logError("Corrupt event read: Event size (%s) greater than chunk " ~
475         "size (%s)", readState_.eventLen_, chunkSize_);
476       return true;
477     } else if (((offset_ + readState_.bufferPos_ - EventSize.sizeof) / chunkSize_) !=
478       ((offset_ + readState_.bufferPos_ + readState_.eventLen_ - EventSize.sizeof) / chunkSize_))
479     {
480       // Size indicates that event crosses chunk boundary
481       logError("Read corrupt event. Event crosses chunk boundary. " ~
482         "Event size: %s. Offset: %s", readState_.eventLen_,
483         (offset_ + readState_.bufferPos_ + EventSize.sizeof)
484       );
485 
486       return true;
487     }
488 
489     return false;
490   }
491 
492   void performRecovery() {
493     // perform some kickass recovery
494     auto curChunk = getCurChunk();
495     if (lastBadChunk_ == curChunk) {
496       numCorruptedEventsInChunk_++;
497     } else {
498       lastBadChunk_ = curChunk;
499       numCorruptedEventsInChunk_ = 1;
500     }
501 
502     if (numCorruptedEventsInChunk_ < maxCorruptedEvents_) {
503       // maybe there was an error in reading the file from disk
504       // seek to the beginning of chunk and try again
505       seekToChunk(curChunk);
506     } else {
507       // Just skip ahead to the next chunk if we not already at the last chunk.
508       if (curChunk != (getNumChunks() - 1)) {
509         seekToChunk(curChunk + 1);
510       } else if (readTimeout_ < dur!"hnsecs"(0)) {
511         // We are in tailing mode, wait until there is enough data to start
512         // the next chunk.
513         while(curChunk == (getNumChunks() - 1)) {
514           Thread.sleep(corruptedEventSleepDuration_);
515         }
516         seekToChunk(curChunk + 1);
517       } else {
518         // Pretty hosed at this stage, rewind the file back to the last
519         // successful point and punt on the error.
520         readState_.resetState(readState_.lastDispatchPos_);
521         currentEvent_ = null;
522         currentEventPos_ = 0;
523 
524         throw new TTransportException("File corrupted at offset: " ~
525           to!string(offset_ + readState_.lastDispatchPos_),
526           TTransportException.Type.CORRUPTED_DATA);
527       }
528     }
529   }
530 
531   string path_;
532   File file_;
533   bool isOpen_;
534   long offset_;
535   ubyte[] currentEvent_;
536   size_t currentEventPos_;
537   ulong chunkSize_;
538   Duration readTimeout_;
539   size_t maxEventSize_;
540 
541   // Read buffer – lazily allocated on the first read().
542   ubyte[] readBuffer_;
543   size_t readBufferSize_;
544 
545   static struct ReadState {
546     ubyte[] event_;
547     size_t eventLen_;
548     size_t eventPos_;
549 
550     // keep track of event size
551     ubyte[4] eventSizeBuff_;
552     ubyte eventSizeBuffPos_;
553     bool readingSize_ = true;
554 
555     // read buffer variables
556     size_t bufferPos_;
557     size_t bufferLen_;
558 
559     // last successful dispatch point
560     size_t lastDispatchPos_;
561 
562     void resetState(size_t lastDispatchPos) {
563       readingSize_ = true;
564       eventSizeBuffPos_ = 0;
565       lastDispatchPos_ = lastDispatchPos;
566     }
567 
568     void resetAllValues() {
569       resetState(0);
570       bufferPos_ = 0;
571       bufferLen_ = 0;
572       event_ = null;
573     }
574   }
575   ReadState readState_;
576 
577   ulong lastBadChunk_;
578   uint maxCorruptedEvents_;
579   uint numCorruptedEventsInChunk_;
580   Duration corruptedEventSleepDuration_;
581 }
582 
583 /**
584  * A transport used to write log files. It can never be read from, calling
585  * read() throws.
586  *
587  * Contrary to the C++ design, explicitly opening the transport/file before
588  * using is necessary to allow manually closing the file without relying on the
589  * object lifetime.
590  */
591 final class TFileWriterTransport : TBaseTransport {
592   /**
593    * Creates a new file writer transport.
594    *
595    * Params:
596    *   path = Path of the file to opperate on.
597    */
598   this(string path) {
599     path_ = path;
600 
601     chunkSize_ = DEFAULT_CHUNK_SIZE;
602     eventBufferSize_ = DEFAULT_EVENT_BUFFER_SIZE;
603     ioErrorSleepDuration = DEFAULT_IO_ERROR_SLEEP_DURATION;
604     maxFlushBytes_ = DEFAULT_MAX_FLUSH_BYTES;
605     maxFlushInterval_ = DEFAULT_MAX_FLUSH_INTERVAL;
606   }
607 
608   override bool isOpen() @property {
609     return isOpen_;
610   }
611 
612   /**
613    * A file writer transport can never be read from.
614    */
615   override bool peek() {
616     return false;
617   }
618 
619   override void open() {
620     if (isOpen) return;
621 
622     writerThread_ = spawn(
623       &writerThread,
624       path_,
625       chunkSize_,
626       maxFlushBytes_,
627       maxFlushInterval_,
628       ioErrorSleepDuration_
629     );
630     setMaxMailboxSize(writerThread_, eventBufferSize_, OnCrowding.block);
631     isOpen_ = true;
632   }
633 
634   /**
635    * Closes the transport, i.e. the underlying file and the writer thread.
636    */
637   override void close() {
638     if (!isOpen) return;
639 
640     prioritySend(writerThread_, ShutdownMessage(), thisTid); // FIXME: Should use normal send here.
641     receive((ShutdownMessage msg, Tid tid){});
642     isOpen_ = false;
643   }
644 
645   /**
646    * Enqueues the passed slice of data for writing and immediately returns.
647    * write() only blocks if the event buffer has been exhausted.
648    *
649    * The transport must be open when calling this.
650    *
651    * Params:
652    *   buf = Slice of data to write.
653    */
654   override void write(in ubyte[] buf) {
655     enforce(isOpen, new TTransportException(
656       "Cannot write to non-open file.", TTransportException.Type.NOT_OPEN));
657 
658     if (buf.empty) {
659       logError("Cannot write empty event, skipping.");
660       return;
661     }
662 
663     auto maxSize = chunkSize - EventSize.sizeof;
664     enforce(buf.length <= maxSize, new TTransportException(
665       "Cannot write more than " ~ to!string(maxSize) ~
666       "bytes at once due to chunk size."));
667 
668     send(writerThread_, buf.idup);
669   }
670 
671   /**
672    * Flushes any pending data to be written.
673    *
674    * The transport must be open when calling this.
675    *
676    * Throws: TTransportException if an error occurs.
677    */
678   override void flush() {
679     enforce(isOpen, new TTransportException(
680       "Cannot flush file if not open.", TTransportException.Type.NOT_OPEN));
681 
682     send(writerThread_, FlushMessage(), thisTid);
683     receive((FlushMessage msg, Tid tid){});
684   }
685 
686   /**
687    * The size of the chunks the file is divided into, in bytes.
688    *
689    * A single event (write call) never spans multiple chunks – this
690    * effectively limits the event size to chunkSize - EventSize.sizeof.
691    */
692   ulong chunkSize() @property {
693     return chunkSize_;
694   }
695 
696   /// ditto
697   void chunkSize(ulong value) @property {
698     enforce(!isOpen, new TTransportException(
699       "Cannot set chunk size after TFileWriterTransport has been opened."));
700     chunkSize_ = value;
701   }
702 
703   /**
704    * The maximum number of write() calls buffered, or zero for no limit.
705    *
706    * If the buffer is exhausted, write() will block until space becomes
707    * available.
708    */
709   size_t eventBufferSize() @property {
710     return eventBufferSize_;
711   }
712 
713   /// ditto
714   void eventBufferSize(size_t value) @property {
715     eventBufferSize_ = value;
716     if (isOpen) {
717       setMaxMailboxSize(writerThread_, value, OnCrowding.throwException);
718     }
719   }
720 
721   /// ditto
722   enum DEFAULT_EVENT_BUFFER_SIZE = 10_000;
723 
724   /**
725    * Maximum number of bytes buffered before writing and flushing the file
726    * to disk.
727    *
728    * Currently cannot be set after the first call to write().
729    */
730   size_t maxFlushBytes() @property {
731     return maxFlushBytes_;
732   }
733 
734   /// ditto
735   void maxFlushBytes(size_t value) @property {
736     maxFlushBytes_ = value;
737     if (isOpen) {
738       send(writerThread_, FlushBytesMessage(value));
739     }
740   }
741 
742   /// ditto
743   enum DEFAULT_MAX_FLUSH_BYTES = 1000 * 1024;
744 
745   /**
746    * Maximum interval between flushing the file to disk.
747    *
748    * Currenlty cannot be set after the first call to write().
749    */
750   Duration maxFlushInterval() @property {
751     return maxFlushInterval_;
752   }
753 
754   /// ditto
755   void maxFlushInterval(Duration value) @property {
756     maxFlushInterval_ = value;
757     if (isOpen) {
758       send(writerThread_, FlushIntervalMessage(value));
759     }
760   }
761 
762   /// ditto
763   enum DEFAULT_MAX_FLUSH_INTERVAL = dur!"seconds"(3);
764 
765   /**
766    * When the writer thread encounteres an I/O error, it goes pauses for a
767    * short time before trying to reopen the output file. This controls the
768    * sleep duration.
769    */
770   Duration ioErrorSleepDuration() @property {
771     return ioErrorSleepDuration_;
772   }
773 
774   /// ditto
775   void ioErrorSleepDuration(Duration value) @property {
776     ioErrorSleepDuration_ = value;
777     if (isOpen) {
778       send(writerThread_, FlushIntervalMessage(value));
779     }
780   }
781 
782   /// ditto
783   enum DEFAULT_IO_ERROR_SLEEP_DURATION = dur!"msecs"(500);
784 
785 private:
786   string path_;
787   ulong chunkSize_;
788   size_t eventBufferSize_;
789   Duration ioErrorSleepDuration_;
790   size_t maxFlushBytes_;
791   Duration maxFlushInterval_;
792   bool isOpen_;
793   Tid writerThread_;
794 }
795 
796 private {
797   // Signals that the file should be flushed on disk. Sent to the writer
798   // thread and sent back along with the tid for confirmation.
799   struct FlushMessage {}
800 
801   // Signals that the writer thread should close the file and shut down. Sent
802   // to the writer thread and sent back along with the tid for confirmation.
803   struct ShutdownMessage {}
804 
805   struct FlushBytesMessage {
806     size_t value;
807   }
808 
809   struct FlushIntervalMessage {
810     Duration value;
811   }
812 
813   struct IoErrorSleepDurationMessage {
814     Duration value;
815   }
816 
817   void writerThread(
818     string path,
819     ulong chunkSize,
820     size_t maxFlushBytes,
821     Duration maxFlushInterval,
822     Duration ioErrorSleepDuration
823   ) {
824     bool errorOpening;
825     File file;
826     ulong offset;
827     try {
828       // Open file in appending and binary mode.
829       file = File(path, "ab");
830       offset = file.tell();
831     } catch (Exception e) {
832       logError("Error on opening output file in writer thread: %s", e);
833       errorOpening = true;
834     }
835 
836     auto flushTimer = StopWatch(AutoStart.yes);
837     size_t unflushedByteCount;
838 
839     Tid shutdownRequestTid;
840     bool shutdownRequested;
841     while (true) {
842       if (shutdownRequested) break;
843 
844       bool forceFlush;
845       Tid flushRequestTid;
846       receiveTimeout(max(dur!"hnsecs"(0), maxFlushInterval - flushTimer.peek()),
847         (immutable(ubyte)[] data) {
848           while (errorOpening) {
849             logError("Writer thread going to sleep for %s µs due to IO errors",
850               ioErrorSleepDuration.fracSec.usecs);
851 
852             // Sleep for ioErrorSleepDuration, being ready to be interrupted
853             // by shutdown requests.
854             auto timedOut = receiveTimeout(ioErrorSleepDuration,
855               (ShutdownMessage msg, Tid tid){ shutdownRequestTid = tid; });
856             if (!timedOut) {
857               // We got a shutdown request, just drop all events and exit the
858               // main loop as to not block application shutdown with our tries
859               // which we must assume to fail.
860               break;
861             }
862 
863             try {
864               file = File(path, "ab");
865               unflushedByteCount = 0;
866               errorOpening = false;
867               logError("Output file %s reopened during writer thread error " ~
868                 "recovery", path);
869             } catch (Exception e) {
870               logError("Unable to reopen output file %s during writer " ~
871                 "thread error recovery", path);
872             }
873           }
874 
875           // Make sure the event does not cross the chunk boundary by writing
876           // a padding consisting of zeroes if it would.
877           auto chunk1 = offset / chunkSize;
878           auto chunk2 = (offset + EventSize.sizeof + data.length - 1) / chunkSize;
879 
880           if (chunk1 != chunk2) {
881             // TODO: The C++ implementation refetches the offset here to »keep
882             // in sync« – why would this be needed?
883             auto padding = cast(size_t)
884               ((((offset / chunkSize) + 1) * chunkSize) - offset);
885             auto zeroes = new ubyte[padding];
886             file.rawWrite(zeroes);
887             unflushedByteCount += padding;
888             offset += padding;
889           }
890 
891           // TODO: 2 syscalls here, is this a problem performance-wise?
892           // Probably abysmal performance on Windows due to rawWrite
893           // implementation.
894           uint len = cast(uint)data.length;
895           file.rawWrite(cast(ubyte[])(&len)[0..1]);
896           file.rawWrite(data);
897 
898           auto bytesWritten = EventSize.sizeof + data.length;
899           unflushedByteCount += bytesWritten;
900           offset += bytesWritten;
901         }, (FlushBytesMessage msg) {
902           maxFlushBytes = msg.value;
903         }, (FlushIntervalMessage msg) {
904           maxFlushInterval = msg.value;
905         }, (IoErrorSleepDurationMessage msg) {
906           ioErrorSleepDuration = msg.value;
907         }, (FlushMessage msg, Tid tid) {
908           forceFlush = true;
909           flushRequestTid = tid;
910         }, (OwnerTerminated msg) {
911           shutdownRequested = true;
912         }, (ShutdownMessage msg, Tid tid) {
913           shutdownRequested = true;
914           shutdownRequestTid = tid;
915         }
916       );
917 
918       if (errorOpening) continue;
919 
920       bool flush;
921       if (forceFlush || shutdownRequested || unflushedByteCount > maxFlushBytes) {
922         flush = true;
923       } else if (cast(Duration)flushTimer.peek() > maxFlushInterval) {
924         if (unflushedByteCount == 0) {
925           // If the flush timer is due, but no data has been written, don't
926           // needlessly fsync, but do reset the timer.
927           flushTimer.reset();
928         } else {
929           flush = true;
930         }
931       }
932 
933       if (flush) {
934         file.flush();
935         flushTimer.reset();
936         unflushedByteCount = 0;
937         if (forceFlush) send(flushRequestTid, FlushMessage(), thisTid);
938       }
939     }
940 
941     file.close();
942 
943     if (shutdownRequestTid != Tid.init) {
944       send(shutdownRequestTid, ShutdownMessage(), thisTid);
945     }
946   }
947 }
948 
949 version (unittest) {
950   import core.memory : GC;
951   import std.file;
952 }
953 
954 unittest {
955   void tryRemove(string fileName) {
956     try {
957       remove(fileName);
958     } catch (Exception) {}
959   }
960 
961   immutable fileName = "unittest.dat.tmp";
962   enforce(!exists(fileName), "Unit test output file " ~ fileName ~
963     " already exists.");
964 
965   /*
966    * Check the most basic reading/writing operations.
967    */
968   {
969     scope (exit) tryRemove(fileName);
970 
971     auto writer = new TFileWriterTransport(fileName);
972     writer.open();
973     scope (exit) writer.close();
974 
975     writer.write([1, 2]);
976     writer.write([3, 4]);
977     writer.write([5, 6, 7]);
978     writer.flush();
979 
980     auto reader = new TFileReaderTransport(fileName);
981     reader.open();
982     scope (exit) reader.close();
983 
984     auto buf = new ubyte[7];
985     reader.readAll(buf);
986     enforce(buf == [1, 2, 3, 4, 5, 6, 7]);
987   }
988 
989   /*
990    * Check that chunking works as expected.
991    */
992   {
993     scope (exit) tryRemove(fileName);
994 
995     static assert(EventSize.sizeof == 4);
996     enum CHUNK_SIZE = 10;
997 
998     // Write some contents to the file.
999     {
1000       auto writer = new TFileWriterTransport(fileName);
1001       writer.chunkSize = CHUNK_SIZE;
1002       writer.open();
1003       scope (exit) writer.close();
1004 
1005       writer.write([0xde]);
1006       writer.write([0xad]);
1007       // Chunk boundary here.
1008       writer.write([0xbe]);
1009       // The next write doesn't fit in the five bytes remaining, so we expect
1010       // padding zero bytes to be written.
1011       writer.write([0xef, 0x12]);
1012 
1013       try {
1014         writer.write(new ubyte[CHUNK_SIZE]);
1015         enforce(false, "Could write event not fitting in a single chunk.");
1016       } catch (TTransportException e) {}
1017 
1018       writer.flush();
1019     }
1020 
1021     // Check the raw contents of the file to see if chunk padding was written
1022     // as expected.
1023     auto file = File(fileName, "r");
1024     enforce(file.size == 26);
1025     auto written = new ubyte[26];
1026     file.rawRead(written);
1027     enforce(written == [
1028       1, 0, 0, 0, 0xde,
1029       1, 0, 0, 0, 0xad,
1030       1, 0, 0, 0, 0xbe,
1031       0, 0, 0, 0, 0,
1032       2, 0, 0, 0, 0xef, 0x12
1033     ]);
1034 
1035     // Read the data back in, getting all the events at once.
1036     {
1037       auto reader = new TFileReaderTransport(fileName);
1038       reader.chunkSize = CHUNK_SIZE;
1039       reader.open();
1040       scope (exit) reader.close();
1041 
1042       auto buf = new ubyte[5];
1043       reader.readAll(buf);
1044       enforce(buf == [0xde, 0xad, 0xbe, 0xef, 0x12]);
1045     }
1046   }
1047 
1048   /*
1049    * Make sure that close() exits "quickly", i.e. that there is no problem
1050    * with the worker thread waking up.
1051    */
1052   {
1053     import std.conv : text;
1054     enum NUM_ITERATIONS = 1000;
1055 
1056     uint numOver = 0;
1057     foreach (n; 0 .. NUM_ITERATIONS) {
1058       scope (exit) tryRemove(fileName);
1059 
1060       auto transport = new TFileWriterTransport(fileName);
1061       transport.open();
1062 
1063       // Write something so that the writer thread gets started.
1064       transport.write(cast(ubyte[])"foo");
1065 
1066       // Every other iteration, also call flush(), just in case that potentially
1067       // has any effect on how the writer thread wakes up.
1068       if (n & 0x1) {
1069         transport.flush();
1070       }
1071 
1072       // Time the call to close().
1073       auto sw = StopWatch(AutoStart.yes);
1074       transport.close();
1075       sw.stop();
1076 
1077       // If any attempt takes more than 500ms, treat that as a fatal failure to
1078       // avoid looping over a potentially very slow operation.
1079       enforce(sw.peek().msecs < 500,
1080         text("close() took ", sw.peek().msecs, "ms."));
1081 
1082       // Normally, it takes less than 5ms on my dev box.
1083       // However, if the box is heavily loaded, some of the test runs can take
1084       // longer. Additionally, on a Windows Server 2008 instance running in
1085       // a VirtualBox VM, it has been observed that about a quarter of the runs
1086       // takes (217 ± 1) ms, for reasons not yet known.
1087       if (sw.peek().msecs > 5) {
1088         ++numOver;
1089       }
1090 
1091       // Force garbage collection runs every now and then to make sure we
1092       // don't run out of OS thread handles.
1093       if (!(n % 100)) GC.collect();
1094     }
1095 
1096     // Make sure fewer than a third of the runs took longer than 5ms.
1097     enforce(numOver < NUM_ITERATIONS / 3,
1098       text(numOver, " iterations took more than 10 ms."));
1099   }
1100 }