Row level lock code for the archive storage engine.
sql/examples/ha_archive.h: Updates for new meta file. sql/examples/ha_archive.cc: Removed debug statement.
This commit is contained in:
parent
cd1cd0df72
commit
9da04daa00
@ -42,7 +42,18 @@
|
||||
handle bulk inserts as well (that is if someone was trying to read at
|
||||
the same time since we would want to flush).
|
||||
|
||||
No attempts at durability are made. You can corrupt your data.
|
||||
A "meta" file is kept. All this file does is contain information on
|
||||
the number of rows.
|
||||
|
||||
No attempts at durability are made. You can corrupt your data. A repair
|
||||
method was added to repair the meta file that stores row information,
|
||||
but if your data file gets corrupted I haven't solved that. I could
|
||||
create a repair that would solve this, but do you want to take a
|
||||
chance of loosing your data?
|
||||
|
||||
Locks are row level, and you will get a consistant read. Transactions
|
||||
will be added later (they are not that hard to add at this
|
||||
stage).
|
||||
|
||||
For performance as far as table scans go it is quite fast. I don't have
|
||||
good numbers but locally it has out performed both Innodb and MyISAM. For
|
||||
@ -71,14 +82,35 @@
|
||||
Add truncate table command.
|
||||
Implement versioning, should be easy.
|
||||
Allow for errors, find a way to mark bad rows.
|
||||
See if during an optimize you can make the table smaller.
|
||||
Talk to the gzip guys, come up with a writable format so that updates are doable
|
||||
without switching to a block method.
|
||||
Add optional feature so that rows can be flushed at interval (which will cause less
|
||||
compression but may speed up ordered searches).
|
||||
compression but may speed up ordered searches).
|
||||
Checkpoint the meta file to allow for faster rebuilds.
|
||||
Dirty open (right now the meta file is repaired if a crash occured).
|
||||
Transactions.
|
||||
Option to allow for dirty reads, this would lower the sync calls, which would make
|
||||
inserts a lot faster, but would mean highly arbitrary reads.
|
||||
|
||||
-Brian
|
||||
*/
|
||||
/*
|
||||
Notes on file formats.
|
||||
The Meta file is layed out as:
|
||||
check - Just an int of 254 to make sure that the the file we are opening was
|
||||
never corrupted.
|
||||
version - The current version of the file format.
|
||||
rows - This is an unsigned long long which is the number of rows in the data
|
||||
file.
|
||||
check point - Reserved for future use
|
||||
dirty - Status of the file, whether or not its values are the latest. This flag
|
||||
is what causes a repair to occur
|
||||
|
||||
The data file:
|
||||
check - Just an int of 254 to make sure that the the file we are opening was
|
||||
never corrupted.
|
||||
version - The current version of the file format.
|
||||
data - The data is stored in a "row +blobs" format.
|
||||
|
||||
/* Variables for archive share methods */
|
||||
pthread_mutex_t archive_mutex;
|
||||
@ -86,8 +118,11 @@ static HASH archive_open_tables;
|
||||
static int archive_init= 0;
|
||||
|
||||
/* The file extension */
|
||||
#define ARZ ".ARZ"
|
||||
#define ARN ".ARN"
|
||||
#define ARZ ".ARZ" // The data file
|
||||
#define ARN ".ARN" // Files used during an optimize call
|
||||
#define ARM ".ARM" // Meta file
|
||||
#define META_BUFFER_SIZE 24 // Size of the data used in the meta file
|
||||
#define CHECK_HEADER 254 // The number we use to determine corruption
|
||||
|
||||
/*
|
||||
Used for hash table that tracks open tables.
|
||||
@ -99,14 +134,132 @@ static byte* archive_get_key(ARCHIVE_SHARE *share,uint *length,
|
||||
return (byte*) share->table_name;
|
||||
}
|
||||
|
||||
/*
|
||||
This method reads the header of a datafile and returns whether or not it was successful.
|
||||
*/
|
||||
int ha_archive::read_data_header(gzFile file_to_read)
|
||||
{
|
||||
int check; // We use this to check the header
|
||||
|
||||
DBUG_ENTER("ha_archive::read_data_header");
|
||||
|
||||
if (gzrewind(file_to_read) == -1)
|
||||
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
|
||||
|
||||
if (gzread(file_to_read, &check, sizeof(int)) != sizeof(int))
|
||||
DBUG_RETURN(errno ? errno : -1);
|
||||
if (check != CHECK_HEADER)
|
||||
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
|
||||
if (gzread(file_to_read, &version, sizeof(version)) != sizeof(version))
|
||||
DBUG_RETURN(errno ? errno : -1);
|
||||
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
|
||||
/*
|
||||
Example of simple lock controls.
|
||||
See ha_example.cc for a description.
|
||||
This method writes out the header of a datafile and returns whether or not it was successful.
|
||||
*/
|
||||
static ARCHIVE_SHARE *get_share(const char *table_name, TABLE *table)
|
||||
int ha_archive::write_data_header(gzFile file_to_write)
|
||||
{
|
||||
int check= CHECK_HEADER;
|
||||
DBUG_ENTER("ha_archive::write_data_header");
|
||||
|
||||
if (gzwrite(file_to_write, &check, sizeof(int)) != sizeof(int))
|
||||
goto error;
|
||||
if (gzwrite(file_to_write, &version, sizeof(int)) != sizeof(version))
|
||||
goto error;
|
||||
|
||||
DBUG_RETURN(0);
|
||||
error:
|
||||
DBUG_RETURN(errno);
|
||||
}
|
||||
|
||||
/*
|
||||
This method reads the header of a meta file and returns whether or not it was successful.
|
||||
*rows will contain the current number of rows in the data file upon success.
|
||||
*/
|
||||
int ha_archive::read_meta_file(File meta_file, ulonglong *rows)
|
||||
{
|
||||
size_t size= sizeof(ulonglong) + sizeof(version) + sizeof(bool); // calculate
|
||||
byte meta_buffer[META_BUFFER_SIZE];
|
||||
bool dirty;
|
||||
int check;
|
||||
ulonglong check_point;
|
||||
|
||||
DBUG_ENTER("ha_archive::read_meta_file");
|
||||
|
||||
/*
|
||||
Format of the meta file is:
|
||||
version
|
||||
number of rows
|
||||
byte showing if the file was stored
|
||||
*/
|
||||
VOID(my_seek(meta_file, 0, MY_SEEK_SET, MYF(0)));
|
||||
if (my_read(meta_file, meta_buffer, size, MYF(MY_WME | MY_NABP)))
|
||||
DBUG_RETURN(-1);
|
||||
|
||||
/*
|
||||
Parse out the meta data, we ignore version at the moment
|
||||
*/
|
||||
memcpy(&check, meta_buffer + sizeof(int), sizeof(int));
|
||||
longlongstore(rows, meta_buffer + sizeof(version) + sizeof(int));
|
||||
longlongstore(&check_point, meta_buffer + sizeof(version) + sizeof(int) +
|
||||
sizeof(ulonglong));
|
||||
memcpy(&dirty, meta_buffer+sizeof(ulonglong) + sizeof(version) +
|
||||
sizeof(ulonglong) + sizeof(int), sizeof(bool));
|
||||
|
||||
if (dirty == TRUE)
|
||||
DBUG_RETURN(-1);
|
||||
|
||||
my_sync(meta_file, MYF(MY_WME));
|
||||
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
|
||||
/*
|
||||
This method writes out the header of a meta file and returns whether or not it was successful.
|
||||
By setting dirty you say whether or not the file represents the actual state of the data file.
|
||||
Upon ::open() we set to dirty, and upon ::close() we set to clean. If we determine during
|
||||
a read that the file was dirty we will force a rebuild of this file.
|
||||
*/
|
||||
int ha_archive::write_meta_file(File meta_file, ulonglong rows, bool dirty)
|
||||
{
|
||||
char meta_buffer[META_BUFFER_SIZE];
|
||||
ulonglong check_port= 0;
|
||||
size_t size= sizeof(ulonglong) + sizeof(version) + sizeof(bool) +
|
||||
sizeof(ulonglong); // calculate length of data
|
||||
DBUG_ENTER("ha_archive::write_meta_file");
|
||||
|
||||
/*
|
||||
Format of the meta file is:
|
||||
version
|
||||
number of rows
|
||||
byte showing if the file was stored
|
||||
*/
|
||||
version= ARCHIVE_VERSION;
|
||||
memcpy(meta_buffer, &version, sizeof(version));
|
||||
longlongstore(meta_buffer + sizeof(version), rows); // Position past version
|
||||
longlongstore(meta_buffer + sizeof(version) + sizeof(ulonglong), check_port);
|
||||
memcpy(meta_buffer+sizeof(ulonglong) + sizeof(version) + + sizeof(ulonglong), &dirty, sizeof(bool));
|
||||
|
||||
VOID(my_seek(meta_file, 0, MY_SEEK_SET, MYF(0)));
|
||||
if (my_write(meta_file, meta_buffer, size, MYF(MY_WME | MY_NABP)))
|
||||
DBUG_RETURN(-1);
|
||||
|
||||
my_sync(meta_file, MYF(MY_WME));
|
||||
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
We create the shared memory space that we will use for the open table.
|
||||
See ha_example.cc for a longer description.
|
||||
*/
|
||||
ARCHIVE_SHARE *ha_archive::get_share(const char *table_name, TABLE *table)
|
||||
{
|
||||
ARCHIVE_SHARE *share;
|
||||
char meta_file_name[FN_REFLEN];
|
||||
uint length;
|
||||
char *tmp_name;
|
||||
|
||||
@ -143,33 +296,62 @@ static ARCHIVE_SHARE *get_share(const char *table_name, TABLE *table)
|
||||
return NULL;
|
||||
}
|
||||
|
||||
share->use_count=0;
|
||||
share->table_name_length=length;
|
||||
share->table_name=tmp_name;
|
||||
share->use_count= 0;
|
||||
share->table_name_length= length;
|
||||
share->table_name= tmp_name;
|
||||
fn_format(share->data_file_name,table_name,"",ARZ,MY_REPLACE_EXT|MY_UNPACK_FILENAME);
|
||||
fn_format(meta_file_name,table_name,"",ARM,MY_REPLACE_EXT|MY_UNPACK_FILENAME);
|
||||
strmov(share->table_name,table_name);
|
||||
/*
|
||||
We will use this lock for rows.
|
||||
*/
|
||||
VOID(pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST));
|
||||
if ((share->meta_file= my_open(meta_file_name, O_RDWR, MYF(0))) == -1)
|
||||
goto error;
|
||||
|
||||
if (read_meta_file(share->meta_file, &share->rows_recorded))
|
||||
{
|
||||
/*
|
||||
The problem here is that for some reason, probably a crash, the meta
|
||||
file has been corrupted. So what do we do? Well we try to rebuild it
|
||||
ourself. Once that happens, we reread it, but if that fails we just
|
||||
call it quits and return an error.
|
||||
*/
|
||||
if (rebuild_meta_file(share->table_name, share->meta_file))
|
||||
goto error;
|
||||
if (read_meta_file(share->meta_file, &share->rows_recorded))
|
||||
goto error;
|
||||
}
|
||||
/*
|
||||
After we read, we set the file to dirty. When we close, we will do the
|
||||
opposite.
|
||||
*/
|
||||
(void)write_meta_file(share->meta_file, share->rows_recorded, TRUE);
|
||||
/*
|
||||
It is expensive to open and close the data files and since you can't have
|
||||
a gzip file that can be both read and written we keep a writer open
|
||||
that is shared amoung all open tables.
|
||||
*/
|
||||
if ((share->archive_write= gzopen(share->data_file_name, "ab")) == NULL)
|
||||
goto error;
|
||||
goto error2;
|
||||
if (my_hash_insert(&archive_open_tables, (byte*) share))
|
||||
goto error;
|
||||
goto error2;
|
||||
thr_lock_init(&share->lock);
|
||||
if (pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST))
|
||||
goto error2;
|
||||
goto error3;
|
||||
}
|
||||
share->use_count++;
|
||||
pthread_mutex_unlock(&archive_mutex);
|
||||
|
||||
return share;
|
||||
|
||||
error2:
|
||||
error3:
|
||||
VOID(pthread_mutex_destroy(&share->mutex));
|
||||
thr_lock_delete(&share->lock);
|
||||
/* We close, but ignore errors since we already have errors */
|
||||
(void)gzclose(share->archive_write);
|
||||
error2:
|
||||
my_close(share->meta_file,MYF(0));
|
||||
error:
|
||||
pthread_mutex_unlock(&archive_mutex);
|
||||
my_free((gptr) share, MYF(0));
|
||||
@ -179,10 +361,10 @@ error:
|
||||
|
||||
|
||||
/*
|
||||
Free lock controls.
|
||||
Free the share.
|
||||
See ha_example.cc for a description.
|
||||
*/
|
||||
static int free_share(ARCHIVE_SHARE *share)
|
||||
int ha_archive::free_share(ARCHIVE_SHARE *share)
|
||||
{
|
||||
int rc= 0;
|
||||
pthread_mutex_lock(&archive_mutex);
|
||||
@ -190,7 +372,8 @@ static int free_share(ARCHIVE_SHARE *share)
|
||||
{
|
||||
hash_delete(&archive_open_tables, (byte*) share);
|
||||
thr_lock_delete(&share->lock);
|
||||
pthread_mutex_destroy(&share->mutex);
|
||||
VOID(pthread_mutex_destroy(&share->mutex));
|
||||
(void)write_meta_file(share->meta_file, share->rows_recorded, FALSE);
|
||||
if (gzclose(share->archive_write) == Z_ERRNO)
|
||||
rc= 1;
|
||||
my_free((gptr) share, MYF(0));
|
||||
@ -205,7 +388,7 @@ static int free_share(ARCHIVE_SHARE *share)
|
||||
We just implement one additional file extension.
|
||||
*/
|
||||
const char **ha_archive::bas_ext() const
|
||||
{ static const char *ext[]= { ARZ, ARN, NullS }; return ext; }
|
||||
{ static const char *ext[]= { ARZ, ARN, ARM, NullS }; return ext; }
|
||||
|
||||
|
||||
/*
|
||||
@ -213,7 +396,6 @@ const char **ha_archive::bas_ext() const
|
||||
Create/get our shared structure.
|
||||
Init out lock.
|
||||
We open the file we will read from.
|
||||
Set the size of ref_length.
|
||||
*/
|
||||
int ha_archive::open(const char *name, int mode, uint test_if_locked)
|
||||
{
|
||||
@ -284,36 +466,58 @@ int ha_archive::close(void)
|
||||
int ha_archive::create(const char *name, TABLE *table_arg,
|
||||
HA_CREATE_INFO *create_info)
|
||||
{
|
||||
File create_file;
|
||||
File create_file; // We use to create the datafile and the metafile
|
||||
char name_buff[FN_REFLEN];
|
||||
size_t written;
|
||||
int error;
|
||||
DBUG_ENTER("ha_archive::create");
|
||||
|
||||
/*
|
||||
Right now version for the meta file and the data file is the same.
|
||||
*/
|
||||
version= ARCHIVE_VERSION;
|
||||
|
||||
if ((create_file= my_create(fn_format(name_buff,name,"",ARM,
|
||||
MY_REPLACE_EXT|MY_UNPACK_FILENAME),0,
|
||||
O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
|
||||
{
|
||||
error= my_errno;
|
||||
goto error;
|
||||
}
|
||||
write_meta_file(create_file, 0, FALSE);
|
||||
my_close(create_file,MYF(0));
|
||||
|
||||
/*
|
||||
We reuse name_buff since it is available.
|
||||
*/
|
||||
if ((create_file= my_create(fn_format(name_buff,name,"",ARZ,
|
||||
MY_REPLACE_EXT|MY_UNPACK_FILENAME),0,
|
||||
O_RDWR | O_TRUNC,MYF(MY_WME))) < 0)
|
||||
{
|
||||
error= my_errno;
|
||||
goto err;
|
||||
goto error;
|
||||
}
|
||||
if ((archive= gzdopen(create_file, "ab")) == NULL)
|
||||
{
|
||||
error= errno;
|
||||
delete_table(name);
|
||||
goto err;
|
||||
goto error;
|
||||
}
|
||||
version= ARCHIVE_VERSION;
|
||||
written= gzwrite(archive, &version, sizeof(version));
|
||||
if (gzclose(archive) || written != sizeof(version))
|
||||
if (write_data_header(archive))
|
||||
{
|
||||
error= errno;
|
||||
delete_table(name);
|
||||
goto err;
|
||||
gzclose(archive);
|
||||
goto error2;
|
||||
}
|
||||
|
||||
if (gzclose(archive))
|
||||
goto error2;
|
||||
|
||||
DBUG_RETURN(0);
|
||||
|
||||
err:
|
||||
error2:
|
||||
error= errno;
|
||||
delete_table(name);
|
||||
error:
|
||||
/* Return error number, if we got one */
|
||||
DBUG_RETURN(error ? error : -1);
|
||||
}
|
||||
@ -330,30 +534,41 @@ err:
|
||||
*/
|
||||
int ha_archive::write_row(byte * buf)
|
||||
{
|
||||
char *pos;
|
||||
z_off_t written;
|
||||
DBUG_ENTER("ha_archive::write_row");
|
||||
|
||||
statistic_increment(ha_write_count,&LOCK_status);
|
||||
if (table->timestamp_default_now)
|
||||
update_timestamp(buf+table->timestamp_default_now-1);
|
||||
pthread_mutex_lock(&share->mutex);
|
||||
written= gzwrite(share->archive_write, buf, table->reclength);
|
||||
share->dirty= TRUE;
|
||||
if (written != table->reclength)
|
||||
DBUG_RETURN(errno ? errno : -1);
|
||||
|
||||
goto error;
|
||||
/*
|
||||
We should probably mark the table as damagaged if the record is written
|
||||
but the blob fails.
|
||||
*/
|
||||
for (Field_blob **field=table->blob_field ; *field ; field++)
|
||||
{
|
||||
char *ptr;
|
||||
uint32 size= (*field)->get_length();
|
||||
|
||||
(*field)->get_ptr(&ptr);
|
||||
written= gzwrite(share->archive_write, ptr, (unsigned)size);
|
||||
if (written != size)
|
||||
DBUG_RETURN(errno ? errno : -1);
|
||||
if (size)
|
||||
{
|
||||
(*field)->get_ptr(&ptr);
|
||||
written= gzwrite(share->archive_write, ptr, (unsigned)size);
|
||||
if (written != size)
|
||||
goto error;
|
||||
}
|
||||
}
|
||||
share->rows_recorded++;
|
||||
pthread_mutex_unlock(&share->mutex);
|
||||
|
||||
DBUG_RETURN(0);
|
||||
error:
|
||||
pthread_mutex_unlock(&share->mutex);
|
||||
DBUG_RETURN(errno ? errno : -1);
|
||||
}
|
||||
|
||||
|
||||
@ -369,37 +584,28 @@ int ha_archive::rnd_init(bool scan)
|
||||
int read; // gzread() returns int, and we use this to check the header
|
||||
|
||||
/* We rewind the file so that we can read from the beginning if scan */
|
||||
if(scan)
|
||||
{
|
||||
records= 0;
|
||||
if (gzrewind(archive))
|
||||
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
|
||||
}
|
||||
|
||||
/*
|
||||
If dirty, we lock, and then reset/flush the data.
|
||||
I found that just calling gzflush() doesn't always work.
|
||||
*/
|
||||
if (share->dirty == TRUE)
|
||||
{
|
||||
pthread_mutex_lock(&share->mutex);
|
||||
if (share->dirty == TRUE)
|
||||
{
|
||||
gzflush(share->archive_write, Z_SYNC_FLUSH);
|
||||
share->dirty= FALSE;
|
||||
}
|
||||
pthread_mutex_unlock(&share->mutex);
|
||||
}
|
||||
|
||||
/*
|
||||
At the moment we just check the size of version to make sure the header is
|
||||
intact.
|
||||
*/
|
||||
if (scan)
|
||||
{
|
||||
read= gzread(archive, &version, sizeof(version));
|
||||
if (read != sizeof(version))
|
||||
DBUG_RETURN(errno ? errno : -1);
|
||||
scan_rows= share->rows_recorded;
|
||||
records= 0;
|
||||
|
||||
/*
|
||||
If dirty, we lock, and then reset/flush the data.
|
||||
I found that just calling gzflush() doesn't always work.
|
||||
*/
|
||||
if (share->dirty == TRUE)
|
||||
{
|
||||
pthread_mutex_lock(&share->mutex);
|
||||
if (share->dirty == TRUE)
|
||||
{
|
||||
gzflush(share->archive_write, Z_SYNC_FLUSH);
|
||||
share->dirty= FALSE;
|
||||
}
|
||||
pthread_mutex_unlock(&share->mutex);
|
||||
}
|
||||
|
||||
if (read_data_header(archive))
|
||||
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
|
||||
}
|
||||
|
||||
DBUG_RETURN(0);
|
||||
@ -410,14 +616,19 @@ int ha_archive::rnd_init(bool scan)
|
||||
This is the method that is used to read a row. It assumes that the row is
|
||||
positioned where you want it.
|
||||
*/
|
||||
int ha_archive::get_row(byte *buf)
|
||||
int ha_archive::get_row(gzFile file_to_read, byte *buf)
|
||||
{
|
||||
int read; // Bytes read, gzread() returns int
|
||||
char *last;
|
||||
size_t total_blob_length= 0;
|
||||
Field_blob **field;
|
||||
DBUG_ENTER("ha_archive::get_row");
|
||||
|
||||
read= gzread(archive, buf, table->reclength);
|
||||
read= gzread(file_to_read, buf, table->reclength);
|
||||
DBUG_PRINT("ha_archive::get_row", ("Read %d bytes", read));
|
||||
|
||||
if (read == Z_STREAM_ERROR)
|
||||
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
|
||||
|
||||
/* If we read nothing we are at the end of the file */
|
||||
if (read == 0)
|
||||
@ -428,7 +639,7 @@ int ha_archive::get_row(byte *buf)
|
||||
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
|
||||
|
||||
/* Calculate blob length, we use this for our buffer */
|
||||
for (Field_blob **field=table->blob_field; *field ; field++)
|
||||
for (field=table->blob_field; *field ; field++)
|
||||
total_blob_length += (*field)->get_length();
|
||||
|
||||
/* Adjust our row buffer if we need be */
|
||||
@ -436,14 +647,17 @@ int ha_archive::get_row(byte *buf)
|
||||
last= (char *)buffer.ptr();
|
||||
|
||||
/* Loop through our blobs and read them */
|
||||
for (Field_blob **field=table->blob_field; *field ; field++)
|
||||
for (field=table->blob_field; *field ; field++)
|
||||
{
|
||||
size_t size= (*field)->get_length();
|
||||
read= gzread(archive, last, size);
|
||||
if ((size_t) read != size)
|
||||
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
|
||||
(*field)->set_ptr(size, last);
|
||||
last += size;
|
||||
if (size)
|
||||
{
|
||||
read= gzread(file_to_read, last, size);
|
||||
if ((size_t) read != size)
|
||||
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
|
||||
(*field)->set_ptr(size, last);
|
||||
last += size;
|
||||
}
|
||||
}
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
@ -459,9 +673,15 @@ int ha_archive::rnd_next(byte *buf)
|
||||
int rc;
|
||||
DBUG_ENTER("ha_archive::rnd_next");
|
||||
|
||||
if (!scan_rows)
|
||||
DBUG_RETURN(HA_ERR_END_OF_FILE);
|
||||
scan_rows--;
|
||||
|
||||
statistic_increment(ha_read_rnd_next_count,&LOCK_status);
|
||||
current_position= gztell(archive);
|
||||
rc= get_row(buf);
|
||||
rc= get_row(archive, buf);
|
||||
|
||||
|
||||
if (rc != HA_ERR_END_OF_FILE)
|
||||
records++;
|
||||
|
||||
@ -474,6 +694,7 @@ int ha_archive::rnd_next(byte *buf)
|
||||
each call to ha_archive::rnd_next() if an ordering of the rows is
|
||||
needed.
|
||||
*/
|
||||
|
||||
void ha_archive::position(const byte *record)
|
||||
{
|
||||
DBUG_ENTER("ha_archive::position");
|
||||
@ -496,13 +717,70 @@ int ha_archive::rnd_pos(byte * buf, byte *pos)
|
||||
current_position= ha_get_ptr(pos, ref_length);
|
||||
z_off_t seek= gzseek(archive, current_position, SEEK_SET);
|
||||
|
||||
DBUG_RETURN(get_row(buf));
|
||||
DBUG_RETURN(get_row(archive, buf));
|
||||
}
|
||||
|
||||
/*
|
||||
This method rebuilds the meta file. It does this by walking the datafile and
|
||||
rewriting the meta file.
|
||||
*/
|
||||
int ha_archive::rebuild_meta_file(char *table_name, File meta_file)
|
||||
{
|
||||
int rc;
|
||||
byte *buf;
|
||||
ulonglong rows_recorded= 0;
|
||||
gzFile rebuild_file; /* Archive file we are working with */
|
||||
char data_file_name[FN_REFLEN];
|
||||
DBUG_ENTER("ha_archive::rebuild_meta_file");
|
||||
|
||||
/*
|
||||
Open up the meta file to recreate it.
|
||||
*/
|
||||
fn_format(data_file_name, table_name, "", ARZ,
|
||||
MY_REPLACE_EXT|MY_UNPACK_FILENAME);
|
||||
if ((rebuild_file= gzopen(data_file_name, "rb")) == NULL)
|
||||
DBUG_RETURN(errno ? errno : -1);
|
||||
|
||||
if (rc= read_data_header(rebuild_file))
|
||||
goto error;
|
||||
|
||||
/*
|
||||
We malloc up the buffer we will use for counting the rows.
|
||||
I know, this malloc'ing memory but this should be a very
|
||||
rare event.
|
||||
*/
|
||||
if (!(buf= (byte*) my_malloc(table->rec_buff_length > sizeof(ulonglong) +1 ?
|
||||
table->rec_buff_length : sizeof(ulonglong) +1 ,
|
||||
MYF(MY_WME))))
|
||||
{
|
||||
rc= HA_ERR_CRASHED_ON_USAGE;
|
||||
goto error;
|
||||
}
|
||||
|
||||
while (!(rc= get_row(rebuild_file, buf)))
|
||||
rows_recorded++;
|
||||
|
||||
/*
|
||||
Only if we reach the end of the file do we assume we can rewrite.
|
||||
At this point we reset rc to a non-message state.
|
||||
*/
|
||||
if (rc == HA_ERR_END_OF_FILE)
|
||||
{
|
||||
(void)write_meta_file(meta_file, rows_recorded, FALSE);
|
||||
rc= 0;
|
||||
}
|
||||
|
||||
my_free((gptr) buf, MYF(0));
|
||||
error:
|
||||
gzclose(rebuild_file);
|
||||
|
||||
DBUG_RETURN(rc);
|
||||
}
|
||||
|
||||
/*
|
||||
The table can become fragmented if data was inserted, read, and then
|
||||
inserted again. What we do is open up the file and recompress it completely.
|
||||
*/
|
||||
*/
|
||||
int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt)
|
||||
{
|
||||
DBUG_ENTER("ha_archive::optimize");
|
||||
@ -512,7 +790,8 @@ int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt)
|
||||
char writer_filename[FN_REFLEN];
|
||||
|
||||
/* Lets create a file to contain the new data */
|
||||
fn_format(writer_filename,share->table_name,"",ARN, MY_REPLACE_EXT|MY_UNPACK_FILENAME);
|
||||
fn_format(writer_filename, share->table_name, "", ARN,
|
||||
MY_REPLACE_EXT|MY_UNPACK_FILENAME);
|
||||
|
||||
/* Closing will cause all data waiting to be flushed, to be flushed */
|
||||
gzclose(share->archive_write);
|
||||
@ -547,6 +826,59 @@ int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt)
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
No transactions yet, so this is pretty dull.
|
||||
*/
|
||||
int ha_archive::external_lock(THD *thd, int lock_type)
|
||||
{
|
||||
DBUG_ENTER("ha_archive::external_lock");
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
|
||||
/*
|
||||
Below is an example of how to setup row level locking.
|
||||
*/
|
||||
THR_LOCK_DATA **ha_archive::store_lock(THD *thd,
|
||||
THR_LOCK_DATA **to,
|
||||
enum thr_lock_type lock_type)
|
||||
{
|
||||
if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK) {
|
||||
/*
|
||||
Here is where we get into the guts of a row level lock.
|
||||
If TL_UNLOCK is set
|
||||
If we are not doing a LOCK TABLE or DISCARD/IMPORT
|
||||
TABLESPACE, then allow multiple writers
|
||||
*/
|
||||
|
||||
if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
|
||||
lock_type <= TL_WRITE) && !thd->in_lock_tables
|
||||
&& !thd->tablespace_op) {
|
||||
|
||||
lock_type = TL_WRITE_ALLOW_WRITE;
|
||||
}
|
||||
|
||||
/*
|
||||
In queries of type INSERT INTO t1 SELECT ... FROM t2 ...
|
||||
MySQL would use the lock TL_READ_NO_INSERT on t2, and that
|
||||
would conflict with TL_WRITE_ALLOW_WRITE, blocking all inserts
|
||||
to t2. Convert the lock to a normal read lock to allow
|
||||
concurrent inserts to t2.
|
||||
*/
|
||||
|
||||
if (lock_type == TL_READ_NO_INSERT && !thd->in_lock_tables) {
|
||||
lock_type = TL_READ;
|
||||
}
|
||||
|
||||
lock.type=lock_type;
|
||||
}
|
||||
|
||||
*to++= &lock;
|
||||
|
||||
return to;
|
||||
}
|
||||
|
||||
|
||||
/******************************************************************************
|
||||
|
||||
Everything below here is default, please look at ha_example.cc for
|
||||
@ -616,8 +948,8 @@ void ha_archive::info(uint flag)
|
||||
DBUG_ENTER("ha_archive::info");
|
||||
|
||||
/* This is a lie, but you don't want the optimizer to see zero or 1 */
|
||||
if (records < 2)
|
||||
records= 2;
|
||||
records= share->rows_recorded;
|
||||
deleted= 0;
|
||||
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
@ -634,23 +966,6 @@ int ha_archive::reset(void)
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
|
||||
|
||||
int ha_archive::external_lock(THD *thd, int lock_type)
|
||||
{
|
||||
DBUG_ENTER("ha_archive::external_lock");
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
|
||||
THR_LOCK_DATA **ha_archive::store_lock(THD *thd,
|
||||
THR_LOCK_DATA **to,
|
||||
enum thr_lock_type lock_type)
|
||||
{
|
||||
if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
|
||||
lock.type=lock_type;
|
||||
*to++= &lock;
|
||||
return to;
|
||||
}
|
||||
|
||||
ha_rows ha_archive::records_in_range(uint inx, key_range *min_key,
|
||||
key_range *max_key)
|
||||
{
|
||||
|
@ -32,8 +32,10 @@ typedef struct st_archive_share {
|
||||
uint table_name_length,use_count;
|
||||
pthread_mutex_t mutex;
|
||||
THR_LOCK lock;
|
||||
File meta_file; /* Meta file we use */
|
||||
gzFile archive_write; /* Archive file we are working with */
|
||||
bool dirty; /* Flag for if a flush should occur */
|
||||
ulonglong rows_recorded; /* Number of rows in tables */
|
||||
} ARCHIVE_SHARE;
|
||||
|
||||
/*
|
||||
@ -50,7 +52,8 @@ class ha_archive: public handler
|
||||
z_off_t current_position; /* The position of the row we just read */
|
||||
byte byte_buffer[IO_SIZE]; /* Initial buffer for our string */
|
||||
String buffer; /* Buffer used for blob storage */
|
||||
unsigned int version; /* Used for recording version */
|
||||
uint version; /* Used for recording version */
|
||||
ulonglong scan_rows; /* Number of rows left in scan */
|
||||
|
||||
public:
|
||||
ha_archive(TABLE *table): handler(table)
|
||||
@ -104,7 +107,14 @@ public:
|
||||
int rnd_init(bool scan=1);
|
||||
int rnd_next(byte *buf);
|
||||
int rnd_pos(byte * buf, byte *pos);
|
||||
int get_row(byte *buf);
|
||||
int get_row(gzFile file_to_read, byte *buf);
|
||||
int read_meta_file(File meta_file, ulonglong *rows);
|
||||
int write_meta_file(File meta_file, ulonglong rows, bool dirty);
|
||||
ARCHIVE_SHARE *get_share(const char *table_name, TABLE *table);
|
||||
int free_share(ARCHIVE_SHARE *share);
|
||||
int rebuild_meta_file(char *table_name, File meta_file);
|
||||
int read_data_header(gzFile file_to_read);
|
||||
int write_data_header(gzFile file_to_write);
|
||||
void position(const byte *record);
|
||||
void info(uint);
|
||||
int extra(enum ha_extra_function operation);
|
||||
|
Loading…
x
Reference in New Issue
Block a user