SVN commit 609482 by vandenoever: Remove the big strigi lock! In the CLucene backend, I was being conservative about allowing concurrent reads and writes to the index, hence making indexing slower if you were looking at the status. This has now been fixed and it makes a huge speed difference if you are indexing and searching at the same time. M +44 -61 luceneindexer/cluceneindexmanager.cpp M +8 -11 luceneindexer/cluceneindexmanager.h M +62 -30 luceneindexer/cluceneindexreader.cpp M +16 -3 luceneindexer/cluceneindexreader.h M +6 -8 luceneindexer/cluceneindexwriter.cpp M +0 -1 streamindexer/indexerconfiguration.cpp --- trunk/playground/base/strigi/src/luceneindexer/cluceneindexmanager.cpp #609481:609482 @@ -23,14 +23,14 @@ #include "cluceneindexwriter.h" #include "cluceneindexreader.h" #include +#include +#include #include "stgdirent.h" //our dirent compatibility header... uses native if available using namespace lucene::index; using lucene::analysis::standard::StandardAnalyzer; using lucene::store::FSDirectory; -StrigiMutex CLuceneIndexManager::lock; - jstreams::IndexManager* createCLuceneIndexManager(const char* path) { return new CLuceneIndexManager(path); @@ -41,14 +41,11 @@ CLuceneIndexManager::CLuceneIndexManager(const std::string& path) {//: bitsets(this) { ++numberOfManagers; - dblock = &lock; dbdir = path; - indexreader = 0; indexwriter = 0; - version = 0; writer = new CLuceneIndexWriter(this); - reader = new CLuceneIndexReader(this); analyzer = new StandardAnalyzer(); + mtime = 0; //remove any old segments lying around from crashes, etc //writer->cleanUp(); @@ -58,8 +55,10 @@ CLuceneIndexManager::~CLuceneIndexManager() { // close the writer and analyzer delete writer; - delete reader; - closeReader(); + std::map::iterator r; + for (r = readers.begin(); r != readers.end(); ++r) { + delete r->second; + } closeWriter(); delete analyzer; if (--numberOfManagers == 0) { @@ -69,8 +68,24 @@ } jstreams::IndexReader* CLuceneIndexManager::getIndexReader() { - return reader; + return getReader(); } +CLuceneIndexReader* +CLuceneIndexManager::getReader() { + // TODO check if we should update/reopen the reader + pthread_t self = pthread_self(); + CLuceneIndexReader* r; + STRIGI_MUTEX_LOCK(&lock.lock); + r = readers[self]; + STRIGI_MUTEX_UNLOCK(&lock.lock); + if (r == 0) { + r = new CLuceneIndexReader(this, dbdir); + STRIGI_MUTEX_LOCK(&lock.lock); + readers[self] = r; + STRIGI_MUTEX_UNLOCK(&lock.lock); + } + return r; +} jstreams::IndexWriter* CLuceneIndexManager::getIndexWriter() { return writer; @@ -81,53 +96,18 @@ }*/ IndexWriter* CLuceneIndexManager::refWriter() { - STRIGI_MUTEX_LOCK(&dblock->lock); + STRIGI_MUTEX_LOCK(&writelock.lock); if (indexwriter == 0) { - closeReader(); openWriter(); } return indexwriter; } void CLuceneIndexManager::derefWriter() { - STRIGI_MUTEX_UNLOCK(&dblock->lock); + STRIGI_MUTEX_UNLOCK(&writelock.lock); } -IndexReader* -CLuceneIndexManager::refReader() { - STRIGI_MUTEX_LOCK(&dblock->lock); - if (indexreader == 0) { - closeWriter(); - openReader(); - } - return indexreader; -} void -CLuceneIndexManager::derefReader() { - STRIGI_MUTEX_UNLOCK(&dblock->lock); -} -void -CLuceneIndexManager::openReader() { - try { -// printf("reader at %s\n", dbdir.c_str()); - indexreader = IndexReader::open(dbdir.c_str()); - } catch (CLuceneError& err) { - printf("could not create reader: %s\n", err.what()); - } -} -void -CLuceneIndexManager::closeReader() { - if (indexreader == 0) return; - try { - indexreader->close(); - } catch (CLuceneError& err) { - printf("could not close clucene: %s\n", err.what()); - } - delete indexreader; - indexreader = 0; -} -void CLuceneIndexManager::openWriter(bool truncate) { - version++; try { if (!truncate && IndexReader::indexExists(dbdir.c_str())) { if (IndexReader::isLocked(dbdir.c_str())) { @@ -152,20 +132,7 @@ } int CLuceneIndexManager::docCount() { - int count = 0; - STRIGI_MUTEX_LOCK(&dblock->lock); - if (indexwriter) { - count = indexwriter->docCount(); - } else { - if (indexreader == 0) { - openReader(); - } - if (indexreader) { - count = indexreader->numDocs(); - } - } - STRIGI_MUTEX_UNLOCK(&dblock->lock); - return count; + return getReader()->reader->numDocs(); } int64_t CLuceneIndexManager::getIndexSize() { @@ -196,10 +163,26 @@ } void CLuceneIndexManager::deleteIndex() { - closeReader(); + // todo: close all readers closeWriter(); openWriter(true); } +time_t +CLuceneIndexManager::getIndexMTime() { + time_t t; + STRIGI_MUTEX_LOCK(&lock.lock); + t = mtime; + STRIGI_MUTEX_UNLOCK(&lock.lock); + return t; +} +void +CLuceneIndexManager::setIndexMTime() { + struct timeval t; + gettimeofday(&t, 0); + STRIGI_MUTEX_LOCK(&lock.lock); + mtime = t.tv_sec; + STRIGI_MUTEX_UNLOCK(&lock.lock); +} std::wstring utf8toucs2(const char*p, const char*e) { wstring ucs2; --- trunk/playground/base/strigi/src/luceneindexer/cluceneindexmanager.h #609481:609482 @@ -46,37 +46,34 @@ class CLuceneIndexWriter; class CLuceneIndexManager : public jstreams::IndexManager { private: - StrigiMutex* dblock; - static StrigiMutex lock; + StrigiMutex writelock; + StrigiMutex lock; std::string dbdir; - CLuceneIndexReader* reader; + std::map readers; CLuceneIndexWriter* writer; lucene::index::IndexWriter* indexwriter; - lucene::index::IndexReader* indexreader; //jstreams::QueryBitsetCache bitsets; lucene::analysis::Analyzer* analyzer; - int version; + time_t mtime; static int numberOfManagers; - void openReader(); - void closeReader(); void openWriter(bool truncate=false); - void closeWriter(); public: explicit CLuceneIndexManager(const std::string& path); ~CLuceneIndexManager(); lucene::index::IndexWriter* refWriter(); void derefWriter(); - lucene::index::IndexReader* refReader(); - void derefReader(); jstreams::IndexReader* getIndexReader(); jstreams::IndexWriter* getIndexWriter(); + CLuceneIndexReader* getReader(); // jstreams::QueryBitsetCache* getBitSets(); int32_t docCount(); int64_t getIndexSize(); - int getVersion() const { return version; } void deleteIndex(); + void closeWriter(); + time_t getIndexMTime(); + void setIndexMTime(); }; jstreams::IndexManager* --- trunk/playground/base/strigi/src/luceneindexer/cluceneindexreader.cpp #609481:609482 @@ -64,12 +64,53 @@ jstreams::IndexedDocument&); }; -CLuceneIndexReader::CLuceneIndexReader(CLuceneIndexManager* m) - :manager(m), countversion(-1) { +CLuceneIndexReader::CLuceneIndexReader(CLuceneIndexManager* m, + const string& dir) :manager(m), dbdir(dir), otime(0), reader(0) { + openReader(); } CLuceneIndexReader::~CLuceneIndexReader() { + closeReader(); } +void +CLuceneIndexReader::openReader() { + doccount = -1; + wordcount = -1; + try { +// printf("reader at %s\n", dbdir.c_str()); + reader = lucene::index::IndexReader::open(dbdir.c_str()); + } catch (CLuceneError& err) { + printf("could not create reader: %s\n", err.what()); + reader = 0; + } +} +void +CLuceneIndexReader::closeReader() { + if (reader == 0) return; + try { + reader->close(); + } catch (CLuceneError& err) { + printf("could not close clucene: %s\n", err.what()); + } + delete reader; + reader = 0; +} +bool +CLuceneIndexReader::checkReader(bool enforceCurrent) { + if (manager->getIndexMTime() > otime) { + struct timeval t; + gettimeofday(&t, 0); + if (enforceCurrent || t.tv_sec-otime > 60) { + fprintf(stderr, "reopening reader.\n"); + otime = t.tv_sec; + closeReader(); + } + } + if (reader == 0) { + openReader(); + } + return reader; +} #ifdef _UCS2 typedef map CLuceneIndexReaderFieldMapType; @@ -207,11 +248,10 @@ } int32_t CLuceneIndexReader::countHits(const Query& q) { + if (!checkReader()) return -1; BooleanQuery bq; Private::createBooleanQuery(q, bq); - lucene::index::IndexReader* reader = manager->refReader(); if (reader == 0) { - manager->derefReader(); return 0; } IndexSearcher searcher(reader); @@ -250,7 +290,6 @@ delete hits; } searcher.close(); - manager->derefReader(); return s; } std::vector @@ -258,9 +297,7 @@ BooleanQuery bq; Private::createBooleanQuery(q, bq); std::vector results; - lucene::index::IndexReader* reader = manager->refReader(); - if (reader == 0) { - manager->derefReader(); + if (!checkReader()) { return results; } IndexSearcher searcher(reader); @@ -293,15 +330,12 @@ _CLDELETE(hits); } searcher.close(); - manager->derefReader(); return results; } std::map CLuceneIndexReader::getFiles(char depth) { std::map files; - lucene::index::IndexReader* reader = manager->refReader(); - if (reader == 0) { - manager->derefReader(); + if (!checkReader()) { return files; } @@ -325,28 +359,28 @@ _CLDELETE(d); } _CLDELETE(docs); - manager->derefReader(); return files; } int32_t CLuceneIndexReader::countDocuments() { - return manager->docCount(); + if (!checkReader()) return -1; + if (doccount == -1) { + doccount = manager->docCount(); + } + return doccount; } int32_t CLuceneIndexReader::countWords() { - if (manager->getVersion() == countversion) { - return count; + if (!checkReader()) return -1; + if (wordcount == -1) { + if (reader) { + wordcount = 0; + lucene::index::TermEnum *terms = reader->terms(); + while (terms->next()) wordcount++; + _CLDELETE(terms); + } } - count = 0; - countversion = manager->getVersion(); - lucene::index::IndexReader* reader = manager->refReader(); - if (reader) { - lucene::index::TermEnum *terms = reader->terms(); - while (terms->next()) count++; - _CLDELETE(terms); - } - manager->derefReader(); - return count; + return wordcount; } int64_t CLuceneIndexReader::getIndexSize() { @@ -354,7 +388,7 @@ } int64_t CLuceneIndexReader::getDocumentId(const std::string& uri) { - lucene::index::IndexReader* reader = manager->refReader(); + if (!checkReader()) return -1; int64_t id = -1; TCHAR tstr[CL_MAX_DIR]; @@ -370,7 +404,6 @@ id = -1; } - manager->derefReader(); return id; } /** @@ -380,7 +413,7 @@ time_t CLuceneIndexReader::getMTime(int64_t docid) { if (docid < 0) return 0; - lucene::index::IndexReader* reader = manager->refReader(); + if (!checkReader(true)) return 0; time_t mtime = 0; Document *d = reader->document(docid); if (d) { @@ -390,6 +423,5 @@ mtime = atoi(cstr); delete d; } - manager->derefReader(); return mtime; } --- trunk/playground/base/strigi/src/luceneindexer/cluceneindexreader.h #609481:609482 @@ -22,6 +22,13 @@ #include "indexreader.h" #include +#include +#include +namespace lucene { + namespace index { + class IndexReader; + } +} class CLuceneIndexManager; class CLuceneIndexReader : public jstreams::IndexReader { @@ -29,13 +36,18 @@ private: CLuceneIndexManager* manager; class Private; - int countversion; - int32_t count; + int32_t wordcount; + int32_t doccount; + const std::string dbdir; + time_t otime; - CLuceneIndexReader(CLuceneIndexManager* m); + CLuceneIndexReader(CLuceneIndexManager* m, const std::string& dbdir); ~CLuceneIndexReader(); static const TCHAR* mapId(const wchar_t* id); static std::wstring mapId(const char* id); + void openReader(); + void closeReader(); + bool checkReader(bool ensureCurrent = false); friend class CLuceneIndexReader::Private; public: @@ -48,6 +60,7 @@ int64_t getDocumentId(const std::string& uri); time_t getMTime(int64_t docid); static void addMapping(const TCHAR* from, const TCHAR* to); + lucene::index::IndexReader* reader; }; #endif --- trunk/playground/base/strigi/src/luceneindexer/cluceneindexwriter.cpp #609481:609482 @@ -21,6 +21,7 @@ #include #include #include "cluceneindexwriter.h" +#include "cluceneindexreader.h" #include "cluceneindexmanager.h" #include "stringreader.h" #include "inputstreamreader.h" @@ -178,16 +179,19 @@ delete doc; if ( sr ) delete sr; + manager->setIndexMTime(); } void CLuceneIndexWriter::deleteEntries(const std::vector& entries) { + manager->closeWriter(); for (uint i=0; isetIndexMTime(); } void CLuceneIndexWriter::deleteEntry(const string& entry) { - lucene::index::IndexReader* reader = manager->refReader(); + lucene::index::IndexReader* reader = manager->getReader()->reader; wstring tstr(utf8toucs2(entry)); Term term(_T("path"), tstr.c_str()); @@ -208,8 +212,6 @@ } _CLDELETE(bits); } - - manager->derefReader(); } void CLuceneIndexWriter::deleteAllEntries() { @@ -275,9 +277,8 @@ //remove all unused lucene file elements... unused elements are the result of unexpected shutdowns... //this can add up to a lot of after a while. - lucene::index::IndexReader* reader = manager->refReader(); + lucene::index::IndexReader* reader = manager->getReader()->reader; if (!reader) { - manager->derefReader(); return; } lucene::store::Directory* directory = reader->getDirectory(); @@ -291,7 +292,6 @@ bool locked = lock->obtain(lucene::index::IndexWriter::COMMIT_LOCK_TIMEOUT); #endif if (!locked) { - manager->derefReader(); return; } lucene::index::SegmentInfos infos; @@ -300,7 +300,6 @@ infos.read(directory); } catch(...) { lock->release(); - manager->derefReader(); return; //todo: this may suggest an error... } lock->release(); @@ -344,6 +343,5 @@ } _CLDELETE_ARRAY(files) - manager->derefReader(); } --- trunk/playground/base/strigi/src/streamindexer/indexerconfiguration.cpp #609481:609482 @@ -69,7 +69,6 @@ FNM_PERIOD); } if (match) { - printf("dir '%s' %i\n", path, i->include); return i->include; } }