/* recqueue - Record queue file control Usage: #include For IO_ERROR and its return code values. #include QUEUE * RQ_open (filename, record_size) char *filename; Name of record queue file to access. int record_size; Size of records when creating a new file. int RQ_close (queue) QUEUE *queue; int RQ_size (queue) QUEUE *queue; unsigned int RQ_total_records (queue) QUEUE *queue char * RQ_get (queue, record_number) QUEUE *queue; unsigned record_number; Sequential queue record number to get. int RQ_enqueue (queue, data, amount) QUEUE *queue; char *data; Record data to be queued. int amount; Amount (bytes) of data to go in the record. int RQ_dequeue (queue, data, amount) QUEUE *queue; char *data; Dequeued record data will be copied here. int amount; Amount (bytes) of data to get from the record. int RQ_delete (queue, record_number) QUEUE *queue; unsigned record_number; Sequential queue record number to delete. int RQ_update (queue) QUEUE *queue; Description: A record queue is a doubly linked list of data records stored in a file. The size of a data record must be the same for all records in the queue. It is specified at the time the record queue (RQ) file is created (which occurs the first time it is accessed), and becomes fixed for the duration of the file's existence. The record is stored in the file along with a header that links the record to the previous and following records. Records are linked anywhere into the queue, and may be removed from anywhere in the queue. Empty space in the file is used to allocate new record storage locations. It is maintained as a singly linked list of data records (FIFO). Additional space is added to the file as needed for new record storage. RQ_open - The specified filename is opened in append mode. Thus if the file does not exist, a new one is created (if this is undesireable, the user should check to see if the file exists before calling this function). A QUEUE data structure is allocated (as described in recqueue.h). If memory can't be allocated IO_ERROR is set to IE_NBF (No Buffer). Should the file be in use, up to MAX_ATTEMPTS retries, with a PAUSE_TIME seconds wait between them, will be made (these values are currently set to 5 and 2 in recqueue.h); ultimately, IO_ERROR will be set to QUEUE_BUSY if the file remains busy. When the file exists, the QUEUE_HEADER occupying the first record location in the file is copied into the QUEUE data structure. If the user specifies a non-zero record_size, then this value is viewed as the maximum amount of data expected to be transferred and is checked against the actual size of records in the queue; thus if record_size is greater than that stored in the queue, IO-ERROR will be set to IE_BAD and the queue will not be opened. If the record_size is 0 this check is not done. When a new file is created, the specified record_size is incremented by the size of a RECORD_HEADER then checked to be smaller than the Q_BUF_SIZE (512 bytes in recqueue.h), greater than the size of a QUEUE_HEADER and an even multiple of the Q_BUF_SIZE (adjusting the record_size upwards to accomplish this if possible). A failed validity check sets IO_ERROR to IE_BAD (Bad parameter). The QUEUE is initialized and its QUEUE_HEADER is copied (using put_queue) into a new (empty) block in the file. The function returns the QUEUE pointer on success which must be used in all subsequent function calls in this module. It returns NULL on failure with IO_ERROR set appropriately. RQ_close - The file is closed (at EOF position), and the QUEUE freed. RQ_size - The size of the data storage area of a record (i.e. excluding the RECORD_HEADER associate with each record) is returned. RQ_total_records - The current number of records in the queue is returned as an unsigned integer. RQ_get - The specified record (as an unsigned integer) in the queue is accessed. Records are numbered sequentially starting with record 0 at the head of the queue and according to the order in which they were placed in the queue. If there are fewer records queued than the record specifed, IO_ERROR is set to EOQ (End of Queue). >>> NOTE <<< If a record is current (the QUEUE's record_number is valid), the shortest distance - in terms of sequential records - from the current record, the head of the queue, or the tail of the queue is used to locate the specified record. Otherwise the shortest distance from the head or tail of the queue is used. RQ_enqueue - The next empty record is accessed and the user's data is copied into it. Empty space is automatically allocated to the file to generate new empty records as needed. The "amount" of "data" is copied (unless amount is larger than the record_size, in which case IO_ERROR is set to IE_BAD). The formerly empty record is removed from the empty queue and linked onto the tail of the records queue. The previous tail record is linked forward to the new record. The QUEUE_HEADER is updated. RQ_dequeue - If the queue is not empty, the record at the head of the queue is accessed and the specified amount of data contents is copied into the user's specified data storage area. If the amount of data requested is larger than the record_size, IO_ERROR is set to IE_BAD. The record is then deleted from the queue. RQ_delete - The specified record number is accessed and linked to the empty queue. The previous and following records (if they exist) are accessed and linked to each other, thus unlinking the deleted record. The QUEUE_HEADER is updated. RQ_update - The current queue file buffer is written to the file at the location from which it was read. The update flag is cleared. >>> NOTE <<< All functions, except where otherwise noted, return a SUCCESS or FAILURE status. On failure, IO_ERROR contains a code value (as per the RSX I/O error codes in qioret.h) indicating the cause of the failure. >>> WARNING <<< FAILURE conditions resulting from unexpected disk I/O errors may leave the queue in a corrupted condition. Internal (static) functions: put_queue - The current location (i.e. the byte offset into the file from which the current buffer contents were read) is noted and if the first block of the file is not in the buffer that block is loaded (get_buffer). The QUEUE_HEADER is copied into the buffer which is then updated into the file. The queue is then returned to it previous location. The effect is to update the QUEUE_HEADER in the file yet leave the buffer contents unchanged. get_buffer - If the specified file location (byte offset from the beginning of the file) is not currently in the queue buffer, it will be read into the buffer. If the queue update flag is set, the current buffer will be updated (RQ_update) before the new file block is loaded. When an EOF (End of File) condition is encountered while attempting to load a file block, a new block (new_block) will be added to the file. new_block - The queue buffer is cleared and initialized with empty records. Empty records are forward linked to the next empty record, with the last empty record refering forward to the beginning of a non-existent file block which will be added automatically by this function when an attempt is made to get the buffer which contains that location (see get_buffer). The intialized buffer is then written to the file and the new queue end location noted. ******************************************************************************/ #include #include #include /*#define DEBUG */ #define $$RQ #include "recqueue.h" #define RECORD_SIZE (\ queue->control.record_size - sizeof (RECORD_HEADER)\ ) #define BUF_LOCATION(record_location) (\ (record_location) - ((record_location) % Q_BUF_SIZE)\ ) #define REC_POINTER(record_location) (\ (RECORD_HEADER *)&queue->buffer\ [(INTEGER)((record_location) % Q_BUF_SIZE)]\ ) QUEUE * RQ_open (filename, record_size) STRING filename; INTEGER record_size; { QUEUE *queue; INTEGER attempt, old_file; GLOBAL LONG ftell (); IO_ERROR = 0; /* Allocate a QUEUE and initialize it. */ if (! (queue = (QUEUE *)malloc (sizeof (QUEUE)))) { IO_ERROR = IE_NBF; /* No Buffer */ goto no_open; } zero (queue, sizeof (QUEUE) - Q_BUF_SIZE); #ifdef DEBUG printf ("RQ_open: queue allocated (%d bytes)\n", sizeof (QUEUE)); #endif attempt = 0; while (! (queue->file = fopen (filename, "aun"))) { if (IO_ERROR == IE_WAC || IO_ERROR == IE_LCK) { /* File is Write Accessed or Locked */ if (attempt++ == MAX_ATTEMPTS) { IO_ERROR = QUEUE_BUSY; goto no_open; } else sleep (PAUSE_TIME); } else goto no_open; } queue->end = ftell (queue->file); if (queue->end == NULL) old_file = FALSE; /* This is a new file */ else old_file = TRUE; /* This is an existing file */ #ifdef DEBUG printf ("RQ_open: %s file open, end at %ld\n", (old_file ? "Old" : "New"), queue->end); #endif /* Initialize the queue: File is open and end position noted. Record pointer and record_number are NULL (i.e. no current record). If this is a new queue file: Add new block to the file. Adjust and set record_size. Reset empty location (first record space occupied by QUEUE_HEADER). Put queue control into file. Else: Load buffer from location 0. Copy control (QUEUE_HEADER) from buffer. Confirm user's record_size (unless 0 specified). */ if (old_file) { queue->location = -1; /* Prime the pump */ if (get_buffer (queue, 0L)) { copy (&queue->control, &queue->buffer, sizeof (QUEUE_HEADER)); #ifdef DEBUG printf ("RQ_open: queue->control acquired\n"); #endif if (record_size > RECORD_SIZE) IO_ERROR = IE_BAD; } } else if ((record_size += sizeof (RECORD_HEADER)) <= Q_BUF_SIZE && record_size >= sizeof (QUEUE_HEADER)) { /* Round the record size up to an even multiple of the Q_BUF_SIZE. */ while (Q_BUF_SIZE % record_size) record_size++; queue->control.record_size = record_size; #ifdef DEBUG printf ("RQ_open: adjusted record size %d bytes\n", record_size); #endif if (new_block (queue)) { queue->control.empty = record_size; put_queue (queue); } } else IO_ERROR = IE_BAD; /* Bad parameter (record_size) */ if (IO_ERROR && queue) { no_open: attempt = IO_ERROR; if (queue->file) { if (! old_file) fgetname (queue->file, &queue->buffer); fseek (queue->file, queue->end, 0); fclose (queue->file); if (! old_file) delete (&queue->buffer); } free (queue); queue = NULL; IO_ERROR = attempt; } return (queue); } INTEGER RQ_close (queue) QUEUE *queue; { if (queue->update) RQ_update (queue); #ifdef DEBUG printf ("RQ_close: seek queue->end at %ld\n", queue->end); #endif fseek (queue->file, queue->end, 0); fclose (queue->file); free (queue); return (IO_ERROR ? FAILURE : SUCCESS); } INTEGER RQ_size (queue) QUEUE *queue; { return (RECORD_SIZE); } UCOUNT RQ_total_records (queue) QUEUE *queue; { return (queue->control.total_records); } STRING RQ_get (queue, record_number) QUEUE *queue; UCOUNT record_number; { UCOUNT REGISTER count, check; INTEGER forward; LONG next; if (record_number >= queue->control.total_records) { IO_ERROR = EOQ; return (NULL); } if (record_number <= (count = queue->control.total_records - record_number - 1)) { forward = TRUE; count = record_number + 1; next = queue->control.head; #ifdef DEBUG printf ("RQ_get: go forward %d records from head at %ld\n", count - 1, next); #endif } else { forward = FALSE; count++; next = queue->control.tail; #ifdef DEBUG printf ("RQ_get: go backward %d records from tail at %ld\n", count - 1, next); #endif } if (queue->record) { /* Record current in buffer. */ if (record_number > queue->record_number && (check = record_number - queue->record_number) <= count) { forward = TRUE; count = check; next = queue->record->following; #ifdef DEBUG printf (" go forward %d records from current record\n", count); #endif } else if ((check = queue->record_number - record_number) <= count) { forward = FALSE; count = check; next = queue->record->previous; #ifdef DEBUG printf (" go backward %d records from current record\n", count, next); #endif } } for (; count && next != NULL; count--) { /* Sequence through queue records until desired record reached or an end of queue detected. */ #ifdef DEBUG printf ("RQ_get: next at %ld\n", next); #endif if (get_buffer (queue, next)) next = forward ? (queue->record = REC_POINTER(next))->following : (queue->record = REC_POINTER(next))->previous; else return (NULL); } queue->record_number = forward ? record_number - count : record_number + count; #ifdef DEBUG printf ("RQ_get:\n\ queue->location = %ld\n\ queue->record = %u\n\ queue->record_number = %u\n", queue->location, queue->record, queue->record_number); #endif if (count) { IO_ERROR = EOQ; return (NULL); } else return (queue->record + 1); /* Return pointer to data */ } INTEGER RQ_enqueue (queue, data, amount) QUEUE *queue; STRING data; INTEGER amount; { LONG previous, following; if (amount > RECORD_SIZE) { IO_ERROR = IE_BAD; return (FAILURE); } if (get_buffer (queue, queue->control.empty)) { /* Store the data in an empty record, then link it to the end of queue. */ queue->record = REC_POINTER(queue->control.empty); copy (queue->record + 1, data, amount); #ifdef DEBUG printf ("RQ_enqueue: data copied into empty record @%u (location %ld)\n", queue->record, queue->control.empty); #endif /* >>> CAUTION <<< The location values must be updated in proper order. */ previous = queue->record->previous = queue->control.tail; #ifdef DEBUG printf (" previous at %ld\n", previous); #endif following = queue->control.tail = queue->control.empty; queue->control.empty = queue->record->following; #ifdef DEBUG printf (" new empty location %ld\n", queue->record->following); #endif queue->record->following = NULL; queue->update++; if (queue->control.total_records) { /* Link the previous tail record to the new record. */ if (get_buffer (queue, previous)) REC_POINTER(previous)->following = following; else return (FAILURE); #ifdef DEBUG printf ("RQ_enqueue: previous tail linked to new record\n"); #endif } else queue->control.head = queue->control.tail; /* First record */ queue->record_number = queue->control.total_records++; if (put_queue (queue)) /* Update the record & queue control */ get_buffer (queue, following); } return (IO_ERROR ? FAILURE : SUCCESS); } INTEGER RQ_dequeue (queue, data, amount) QUEUE *queue; STRING data; INTEGER amount; { STRING record; if (amount > RECORD_SIZE) { IO_ERROR = IE_BAD; return (FAILURE); } if ((record = RQ_get (queue, 0))) { copy (data, record, amount); #ifdef DEBUG printf ("RQ_dequeue: record 0 data copied\n"); #endif RQ_delete (queue, 0); } return (IO_ERROR ? FAILURE : SUCCESS); } INTEGER RQ_delete (queue, record_number) QUEUE *queue; UCOUNT record_number; { LONG previous, following; if (RQ_get (queue, record_number)) { previous = queue->record->previous; following = queue->record->following; /* Put the deleted record on the front of the empty queue. */ queue->record->following = queue->control.empty; #ifdef DEBUG printf ("RQ_delete:\n\ queue->control.empty = %ld\n\ queue->location = %ld\n\ queue->record = %u\n\ &queue->buffer = %u\n", queue->control.empty, queue->location, queue->record, &queue->buffer); #endif queue->control.empty = queue->location + ((ADDRESS)queue->record - (ADDRESS)&queue->buffer); #ifdef DEBUG printf ("RQ_delete: record %u is unlinked to empty at %ld\n", record_number, queue->control.empty); #endif queue->update++; if (--queue->control.total_records) { if (previous != NULL) { if (get_buffer (queue, previous)) { (queue->record = REC_POINTER(previous))->following = following; queue->record_number = record_number - 1; #ifdef DEBUG printf ("RQ_delete: previous record at %ld linked forward to %ld\n", previous, following); #endif queue->update++; } else return (FAILURE); } else queue->control.head = following; if (following != NULL) { if (get_buffer (queue, following)) { (queue->record = REC_POINTER(following))->previous = previous; queue->record_number = record_number++; #ifdef DEBUG printf ("RQ_delete: following record at %ld linked back to %ld\n", following, previous); #endif queue->update++; } else return (FAILURE); } else queue->control.tail = previous; } else queue->control.head = queue->control.tail = queue->record = NULL; put_queue (queue); } return (IO_ERROR ? FAILURE : SUCCESS); } INTEGER RQ_update (queue) QUEUE *queue; { #ifdef DEBUG printf ("RQ_update: seeking location %ld\n", queue->location); #endif fseek (queue->file, queue->location, 0); fput (&queue->buffer, Q_BUF_SIZE, queue->file); #ifdef DEBUG printf (" fput complete\n"); #endif queue->update = FALSE; return (IO_ERROR ? FAILURE : SUCCESS); } INTERNAL INTEGER put_queue (queue) QUEUE *queue; { LONG location; if ((location = queue->location) != NULL && ! get_buffer (queue, 0L)) return (FAILURE); copy (&queue->buffer, &queue->control, sizeof (QUEUE_HEADER)); #ifdef DEBUG printf ("put_queue: queue->control copied into buffer\n"); #endif if (RQ_update (queue)) get_buffer (queue, location); #ifdef DEBUG printf ("put_queue: update and location return complete\n"); #endif return (IO_ERROR ? FAILURE : SUCCESS); } INTERNAL INTEGER get_buffer (queue, location) QUEUE *queue; LONG location; /* Location in file of desired buffer */ { IO_ERROR = 0; /* >>> NOTE <<< Nothing happens if the requested location is already buffered. */ if ((location -= location % Q_BUF_SIZE) != queue->location) { if (queue->update && ! RQ_update (queue)) return (FAILURE); #ifdef DEBUG printf ("get_buffer: seeking location %ld\n", location); #endif if (! fseek (queue->file, location, 0)) { fget (&queue->buffer, Q_BUF_SIZE, queue->file); #ifdef DEBUG printf (" fget complete\n"); #endif if (IO_ERROR == IE_EOF) new_block (queue); if (! IO_ERROR) queue->location = location; } } return (IO_ERROR ? FAILURE : SUCCESS); } INTERNAL INTEGER new_block (queue) QUEUE *queue; { REGISTER count, size; RECORD_HEADER *record; LONG location; IO_ERROR= 0; if (queue->control.empty == BUF_LOCATION(queue->end)) { /* A new block can be added to the queue file ONLY when all other empty space has been exhausted (i.e. empty location is EOF). >>> CAUTION <<< The current buffer is cleared and initialized as a list of empty records. Update the buffer before using this function. */ zero (&queue->buffer, Q_BUF_SIZE); #ifdef DEBUG printf ("new_block: queue->end = %ld, record_size = %d\n", queue->end, queue->control.record_size); #endif for ( location = queue->end, record = &queue->buffer, count = Q_BUF_SIZE / (size = queue->control.record_size); count; count--, (ADDRESS)record += size ) { location += size; #ifdef DEBUG printf (" next empty at location %ld\n", location); #endif record->following = location; } queue->location = queue->end; queue->end = location; queue->record = NULL; /* No current record */ queue->update++; } else IO_ERROR = IE_RAC; /* Illegal Record Access */ return (IO_ERROR ? FAILURE : SUCCESS); }