[prev in list] [next in list] [prev in thread] [next in thread] 

List:       mysql-internals
Subject:    bk commit into 5.1 tree (jonas:1.1986)
From:       jonas () mysql ! com
Date:       2005-12-21 15:42:01
Message-ID: 20051221154201.86FF021C358 () perch ! ndb ! mysql ! com
[Download RAW message or body]

Below is the list of changes that have just been committed into a local
5.1 repository of jonas. When jonas does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet
  1.1986 05/12/21 16:41:55 jonas@perch.ndb.mysql.com +16 -0
  wl2723 - ndb optimized node recovery
    functional with following limitations
    1) ordered indexes get corrupted
    2) disk part is not released during sync
    3) nr scan is never run in disk order

  storage/ndb/src/kernel/blocks/dbtup/tuppage.hpp
    1.3 05/12/21 16:41:53 jonas@perch.ndb.mysql.com +1 -0
    Introduce Fix_page::FREE_RECORD to avoid confusion with Tuple_header::FREE

  storage/ndb/src/kernel/blocks/dbtup/tuppage.cpp
    1.3 05/12/21 16:41:52 jonas@perch.ndb.mysql.com +5 -5
    Introduce Fix_page::FREE_RECORD to avoid confusion with Tuple_header::FREE

  storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp
    1.13 05/12/21 16:41:52 jonas@perch.ndb.mysql.com +34 -23
    Fixes to NR scan

  storage/ndb/src/kernel/blocks/dbtup/DbtupFixAlloc.cpp
    1.8 05/12/21 16:41:52 jonas@perch.ndb.mysql.com +44 -24
    1) New method for page allocation (wrt ROWID) that is also used during OptNr
    2) Introduce Fix_page::FREE_RECORD to avoid confusion with Tuple_header::FREE

  storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp
    1.33 05/12/21 16:41:52 jonas@perch.ndb.mysql.com +127 -0
    Add support methods for OptNr

  storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp
    1.10 05/12/21 16:41:52 jonas@perch.ndb.mysql.com +4 -0
    return disk_ref in TUP_DEALLOCREQ

  storage/ndb/src/kernel/blocks/dbtup/DbtupAbort.cpp
    1.9 05/12/21 16:41:52 jonas@perch.ndb.mysql.com +13 -2
    Clear ALLOC bit on abort of insert

  storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp
    1.36 05/12/21 16:41:52 jonas@perch.ndb.mysql.com +6 -5
    Add support methods for OptNr

  storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
    1.92 05/12/21 16:41:52 jonas@perch.ndb.mysql.com +775 -426
    1) Remove unused constants
    2) Change NR according to OptNr.txt
    3) Remove unused ACC/TUP BLOCK/UNBLOCK
    4) Change START_FRAGREQ/STARTRECREQ to be able to run only locally

  storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
    1.45 05/12/21 16:41:52 jonas@perch.ndb.mysql.com +21 -33
    1) Remove unused constants
    2) Change NR according to OptNr.txt
    3) Remove unused ACC/TUP BLOCK/UNBLOCK
    4) Change START_FRAGREQ/STARTRECREQ to be able to run only locally

  storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
    1.40 05/12/21 16:41:52 jonas@perch.ndb.mysql.com +181 -10
    Changes to NR, start fragments on starting node before starting to copy
    New continueb, for starting fragments on starting node

  storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp
    1.12 05/12/21 16:41:52 jonas@perch.ndb.mysql.com +10 -2
    New continueb, for starting fragments on starting node

  storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp
    1.67 05/12/21 16:41:52 jonas@perch.ndb.mysql.com +7 -6
    remove unsued constants

  storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp
    1.27 05/12/21 16:41:52 jonas@perch.ndb.mysql.com +0 -6
    remove unsued constants

  storage/ndb/include/kernel/signaldata/NextScan.hpp
    1.6 05/12/21 16:41:52 jonas@perch.ndb.mysql.com +1 -9
    remove unsued constants

  storage/ndb/include/kernel/signaldata/DihContinueB.hpp
    1.4 05/12/21 16:41:52 jonas@perch.ndb.mysql.com +2 -1
    New continueb, for starting fragments on starting node

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	jonas
# Host:	perch.ndb.mysql.com
# Root:	/home/jonas/src/51-ndb

--- 1.2/storage/ndb/src/kernel/blocks/dbtup/tuppage.cpp	2005-11-04 14:35:51 +01:00
+++ 1.3/storage/ndb/src/kernel/blocks/dbtup/tuppage.cpp	2005-12-21 16:41:52 +01:00
@@ -43,7 +43,7 @@
   Uint32 next = m_data[page_idx] & 0xFFFF;
 
   assert(prev == 0xFFFF);
-  assert(m_data[page_idx + 1] == Dbtup::Tuple_header::FREE);
+  assert(m_data[page_idx + 1] == FREE_RECORD);
   
   m_data[page_idx + 1] = 0;
   if (next != 0xFFFF)
@@ -67,7 +67,7 @@
 Tup_fixsize_page::alloc_record(Uint32 page_idx)
 {
   assert(page_idx + 1 < DATA_WORDS);
-  if (likely(free_space && m_data[page_idx + 1] == Dbtup::Tuple_header::FREE))
+  if (likely(free_space && m_data[page_idx + 1] == FREE_RECORD))
   {
     Uint32 prev = m_data[page_idx] >> 16;
     Uint32 next = m_data[page_idx] & 0xFFFF;
@@ -101,7 +101,7 @@
   Uint32 next = next_free_index;
   
   assert(page_idx + 1 < DATA_WORDS);
-  assert(m_data[page_idx + 1] != Dbtup::Tuple_header::FREE);
+  assert(m_data[page_idx + 1] != FREE_RECORD);
 
   if (next == 0xFFFF)
   {
@@ -114,12 +114,12 @@
     Uint32 nextP = m_data[next];
     assert((nextP >> 16) == 0xFFFF);
     m_data[next] = (page_idx << 16) | (nextP & 0xFFFF);
-    assert(m_data[next + 1] == Dbtup::Tuple_header::FREE);
+    assert(m_data[next + 1] == FREE_RECORD);
   }
 
   next_free_index = page_idx;
   m_data[page_idx] = 0xFFFF0000 | next;
-  m_data[page_idx + 1] = Dbtup::Tuple_header::FREE;
+  m_data[page_idx + 1] = FREE_RECORD;
   
   return ++free_space;
 }

--- 1.2/storage/ndb/src/kernel/blocks/dbtup/tuppage.hpp	2005-10-31 16:23:57 +01:00
+++ 1.3/storage/ndb/src/kernel/blocks/dbtup/tuppage.hpp	2005-12-21 16:41:53 +01:00
@@ -81,6 +81,7 @@
   Uint32 m_extent_info_ptr;
   Uint32 unused_ph[9];
 
+  STATIC_CONST( FREE_RECORD = ~(Uint32)0 );
   STATIC_CONST( DATA_WORDS = File_formats::NDB_PAGE_SIZE_WORDS - 32 );
   
   Uint32 m_data[DATA_WORDS];

--- 1.3/storage/ndb/include/kernel/signaldata/DihContinueB.hpp	2005-04-08 02:43:50 \
                +02:00
+++ 1.4/storage/ndb/include/kernel/signaldata/DihContinueB.hpp	2005-12-21 16:41:52 \
+01:00 @@ -69,7 +69,8 @@
     ZSEND_END_TO                 = 41,
 
     WAIT_DROP_TAB_WRITING_TO_FILE = 42,
-    CHECK_WAIT_DROP_TAB_FAILED_LQH = 43
+    CHECK_WAIT_DROP_TAB_FAILED_LQH = 43,
+    ZTO_START_FRAGMENTS = 44
   };
 };
 

--- 1.5/storage/ndb/include/kernel/signaldata/NextScan.hpp	2005-12-05 16:54:20 +01:00
+++ 1.6/storage/ndb/include/kernel/signaldata/NextScan.hpp	2005-12-21 16:41:52 +01:00
@@ -33,14 +33,6 @@
     ZSCAN_CLOSE = 6,
     ZSCAN_NEXT_ABORT = 12
   };
-  enum CopyFlag {
-    todo_ZCOPY_NEXT = 1,
-    todo_ZCOPY_NEXT_COMMIT = 2,
-    todo_ZCOPY_COMMIT = 3,
-    todo_ZCOPY_REPEAT = 4,
-    todo_ZCOPY_ABORT = 5,
-    todo_ZCOPY_CLOSE = 6
-  };
   STATIC_CONST( SignalLength = 3 );
 private:
   Uint32 accPtr;                // scan record in ACC/TUX
@@ -62,7 +54,7 @@
   Uint32 fragId;
   Uint32 localKey[2];
   Uint32 localKeyLength;
-  Uint32 keyLength;
+  Uint32 gci;
 };
 
 #endif

--- 1.26/storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp	2005-12-21 11:22:33 +01:00
+++ 1.27/storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp	2005-12-21 16:41:52 +01:00
@@ -102,12 +102,6 @@
 #define ZDEFAULT_LIST 3
 #define ZWORDS_IN_PAGE 2048
 #define ZADDFRAG 0
-#define ZCOPY_NEXT 1
-#define ZCOPY_NEXT_COMMIT 2
-#define ZCOPY_COMMIT 3
-#define ZCOPY_REPEAT 4
-#define ZCOPY_ABORT 5
-#define ZCOPY_CLOSE 6
 #define ZDIRARRAY 68
 #define ZDIRRANGESIZE 65
 //#define ZEMPTY_FRAGMENT 0

--- 1.66/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp	2005-12-21 11:24:27 +01:00
+++ 1.67/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp	2005-12-21 16:41:52 +01:00
@@ -21,6 +21,7 @@
 #include <AttributeHeader.hpp>
 #include <signaldata/AccFrag.hpp>
 #include <signaldata/AccScan.hpp>
+#include <signaldata/NextScan.hpp>
 #include <signaldata/AccLock.hpp>
 #include <signaldata/EventReport.hpp>
 #include <signaldata/FsConf.hpp>
@@ -1784,7 +1785,7 @@
 void Dbacc::accAbortReqLab(Signal* signal)
 {
   operationRecPtr.i = signal->theData[0];
-  bool sendConf = signal->theData[1] || signal->getLength() == 1;
+  bool sendConf = signal->theData[1];
   ptrCheckGuard(operationRecPtr, coprecsize, operationrec);
   tresult = 0;	/*                ZFALSE           */
   if ((operationRecPtr.p->transactionstate == ACTIVE) ||
@@ -5389,12 +5390,12 @@
 
   scanPtr.p->scanTimer = scanPtr.p->scanContinuebCounter;
   switch (tscanNextFlag) {
-  case ZCOPY_NEXT:
+  case NextScanReq::ZSCAN_NEXT:
     jam();
     /*empty*/;
     break;
-  case ZCOPY_NEXT_COMMIT:
-  case ZCOPY_COMMIT:
+  case NextScanReq::ZSCAN_NEXT_COMMIT:
+  case NextScanReq::ZSCAN_COMMIT:
     jam();
     /* --------------------------------------------------------------------- */
     /* COMMIT ACTIVE OPERATION. 
@@ -5409,7 +5410,7 @@
     takeOutActiveScanOp(signal);
     releaseOpRec(signal);
     scanPtr.p->scanOpsAllocated--;
-    if (tscanNextFlag == ZCOPY_COMMIT) {
+    if (tscanNextFlag == NextScanReq::ZSCAN_COMMIT) {
       jam();
       signal->theData[0] = scanPtr.p->scanUserptr;
       Uint32 blockNo = refToBlock(scanPtr.p->scanUserblockref);
@@ -5417,7 +5418,7 @@
       return;
     }//if
     break;
-  case ZCOPY_CLOSE:
+  case NextScanReq::ZSCAN_CLOSE:
     jam();
     fragrecptr.i = scanPtr.p->activeLocalFrag;
     ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);

--- 1.11/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp	2005-11-28 14:11:57 +01:00
+++ 1.12/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp	2005-12-21 16:41:52 +01:00
@@ -547,7 +547,9 @@
       TO_END_COPY = 19,
       TO_END_COPY_ONGOING = 20,
       TO_WAIT_ENDING = 21,
-      ENDING = 22
+      ENDING = 22,
+      
+      STARTING_LOCAL_FRAGMENTS = 24
     };
     enum ToSlaveStatus {
       TO_SLAVE_IDLE = 0,
@@ -976,7 +978,9 @@
   void initialiseRecordsLab(Signal *, Uint32 stepNo, Uint32, Uint32);
 
   void findReplica(ReplicaRecordPtr& regReplicaPtr,
-                   Fragmentstore* fragPtrP, Uint32 nodeId);
+                   Fragmentstore* fragPtrP, 
+		   Uint32 nodeId,
+		   bool oldStoredReplicas = false);
 //------------------------------------
 // Node failure handling methods
 //------------------------------------
@@ -1134,6 +1138,10 @@
   void setNodeCopyCompleted(Uint32 nodeId, bool newState);
   bool checkNodeAlive(Uint32 nodeId);
 
+  void nr_start_fragments(Signal*, TakeOverRecordPtr);
+  void nr_start_fragment(Signal*, TakeOverRecordPtr, ReplicaRecordPtr);
+  void nr_run_redo(Signal*, TakeOverRecordPtr);
+  
   // Initialisation
   void initData();
   void initRecords();

--- 1.39/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp	2005-11-28 14:11:57 +01:00
+++ 1.40/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp	2005-12-21 16:41:52 +01:00
@@ -609,6 +609,14 @@
     checkWaitDropTabFailedLqh(signal, nodeId, tableId);
     return;
   }
+  case DihContinueB::ZTO_START_FRAGMENTS:
+  {
+    TakeOverRecordPtr takeOverPtr;
+    takeOverPtr.i = signal->theData[1];
+    ptrCheckGuard(takeOverPtr, MAX_NDB_NODES, takeOverRecord);
+    nr_start_fragments(signal, takeOverPtr);
+    return;
+  }
   }//switch
   
   ndbrequire(false);
@@ -1771,11 +1779,6 @@
   ndbrequire(c_nodeStartMaster.startNode == Tnodeid);
   ndbrequire(getNodeStatus(Tnodeid) == NodeRecord::STARTING);
   
-  sendSTART_RECREQ(signal, Tnodeid);
-}//Dbdih::execSTART_MEREQ()
-
-void Dbdih::nodeRestartStartRecConfLab(Signal* signal) 
-{
   c_nodeStartMaster.blockLcp = true;
   if ((c_lcpState.lcpStatus != LCP_STATUS_IDLE) &&
       (c_lcpState.lcpStatus != LCP_TCGET)) {
@@ -2562,13 +2565,14 @@
     return;
   }//if
   c_startToLock = takeOverPtrI;
+
+  takeOverPtr.p->toMasterStatus = TakeOverRecord::STARTING;
   StartToReq * const req = (StartToReq *)&signal->theData[0];
   req->userPtr = takeOverPtr.i;
   req->userRef = reference();
   req->startingNodeId = takeOverPtr.p->toStartingNode;
   req->nodeTakenOver = takeOverPtr.p->toFailedNode;
   req->nodeRestart = takeOverPtr.p->toNodeRestart;
-  takeOverPtr.p->toMasterStatus = TakeOverRecord::STARTING;
   sendLoopMacro(START_TOREQ, sendSTART_TOREQ);
 }//Dbdih::sendStartTo()
 
@@ -2612,9 +2616,153 @@
   CRASH_INSERTION(7134);
   c_startToLock = RNIL;
 
+  if (takeOverPtr.p->toNodeRestart)
+  {
+    jam();
+    takeOverPtr.p->toMasterStatus = TakeOverRecord::STARTING_LOCAL_FRAGMENTS;
+    nr_start_fragments(signal, takeOverPtr);
+    return;
+  }
+
   startNextCopyFragment(signal, takeOverPtr.i);
 }//Dbdih::execSTART_TOCONF()
 
+void
+Dbdih::nr_start_fragments(Signal* signal, 
+			  TakeOverRecordPtr takeOverPtr)
+{
+  Uint32 loopCount = 0 ;
+  TabRecordPtr tabPtr;
+  while (loopCount++ < 100) {
+    tabPtr.i = takeOverPtr.p->toCurrentTabref;
+    if (tabPtr.i >= ctabFileSize) {
+      jam();
+      nr_run_redo(signal, takeOverPtr);
+      return;
+    }//if
+    ptrAss(tabPtr, tabRecord);
+    if (tabPtr.p->tabStatus != TabRecord::TS_ACTIVE){
+      jam();
+      takeOverPtr.p->toCurrentFragid = 0;
+      takeOverPtr.p->toCurrentTabref++;
+      continue;
+    }//if
+    Uint32 fragId = takeOverPtr.p->toCurrentFragid;
+    if (fragId >= tabPtr.p->totalfragments) {
+      jam();
+      takeOverPtr.p->toCurrentFragid = 0;
+      takeOverPtr.p->toCurrentTabref++;
+      continue;
+    }//if
+    FragmentstorePtr fragPtr;
+    getFragstore(tabPtr.p, fragId, fragPtr);
+    ReplicaRecordPtr loopReplicaPtr;
+    loopReplicaPtr.i = fragPtr.p->oldStoredReplicas;
+    while (loopReplicaPtr.i != RNIL) {
+      ptrCheckGuard(loopReplicaPtr, creplicaFileSize, replicaRecord);
+      if (loopReplicaPtr.p->procNode == takeOverPtr.p->toStartingNode) {
+        jam();
+	nr_start_fragment(signal, takeOverPtr, loopReplicaPtr);
+	break;
+      } else {
+        jam();
+        loopReplicaPtr.i = loopReplicaPtr.p->nextReplica;
+      }//if
+    }//while
+    takeOverPtr.p->toCurrentFragid++;
+  }//while
+  signal->theData[0] = DihContinueB::ZTO_START_FRAGMENTS;
+  signal->theData[1] = takeOverPtr.i;
+  sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
+}
+
+void
+Dbdih::nr_start_fragment(Signal* signal, 
+			 TakeOverRecordPtr takeOverPtr,
+			 ReplicaRecordPtr replicaPtr)
+{
+  Uint32 i, j = 0;
+  Uint32 maxLcpId = 0;
+  Uint32 maxLcpIndex = ~0;
+  
+  Uint32 restorableGCI = 0;
+  
+  ndbout_c("tab: %d frag: %d replicaP->nextLcp: %d",
+	   takeOverPtr.p->toCurrentTabref,
+	   takeOverPtr.p->toCurrentFragid,
+	   replicaPtr.p->nextLcp);
+  
+  Uint32 idx = replicaPtr.p->nextLcp;
+  for(i = 0; i<MAX_LCP_STORED; i++, idx = nextLcpNo(idx))
+  {
+    ndbout_c("scanning idx: %d lcpId: %d", idx, replicaPtr.p->lcpId[idx]);
+    if (replicaPtr.p->lcpStatus[idx] == ZVALID) 
+    {
+      ndbrequire(replicaPtr.p->lcpId[idx] > maxLcpId);
+      Uint32 startGci = replicaPtr.p->maxGciCompleted[idx];
+      Uint32 stopGci = replicaPtr.p->maxGciStarted[idx];
+      for (;j < replicaPtr.p->noCrashedReplicas; j++)
+      {
+	ndbout_c("crashed replica: %d(%d) replicaLastGci: %d",
+		 j, 
+		 replicaPtr.p->noCrashedReplicas,
+		 replicaPtr.p->replicaLastGci[j]);
+	if (replicaPtr.p->replicaLastGci[j] > stopGci)
+	{
+	  maxLcpId = replicaPtr.p->lcpId[idx];
+	  maxLcpIndex = idx;
+	  restorableGCI = replicaPtr.p->replicaLastGci[j];
+	  break;
+	}
+      }
+    }
+  }
+  
+  if (maxLcpIndex == ~0)
+  {
+    ndbout_c("Didnt find any LCP for node: %d tab: %d frag: %d",
+	     takeOverPtr.p->toStartingNode,
+	     takeOverPtr.p->toCurrentTabref,
+	     takeOverPtr.p->toCurrentFragid);
+    replicaPtr.p->lcpIdStarted = 0;
+  }
+  else
+  {
+    ndbout_c("Found LCP: %d(%d) maxGciStarted: %d maxGciCompleted: %d restorable: \
%d(%d) newestRestorableGCI: %d", +	     maxLcpId,
+	     maxLcpIndex,
+	     replicaPtr.p->maxGciStarted[maxLcpIndex],
+	     replicaPtr.p->maxGciCompleted[maxLcpIndex],	     
+	     restorableGCI,
+	     SYSFILE->lastCompletedGCI[takeOverPtr.p->toStartingNode],
+	     SYSFILE->newestRestorableGCI);
+
+    replicaPtr.p->lcpIdStarted = restorableGCI;
+    BlockReference ref = calcLqhBlockRef(takeOverPtr.p->toStartingNode);
+    StartFragReq *req = (StartFragReq *)signal->getDataPtrSend();
+    req->userPtr = 0;
+    req->userRef = reference();
+    req->lcpNo = maxLcpIndex;
+    req->lcpId = maxLcpId;
+    req->tableId = takeOverPtr.p->toCurrentTabref;
+    req->fragId = takeOverPtr.p->toCurrentFragid;
+    req->noOfLogNodes = 1;
+    req->lqhLogNode[0] = takeOverPtr.p->toStartingNode;
+    req->startGci[0] = replicaPtr.p->maxGciCompleted[maxLcpIndex];
+    req->lastGci[0] = restorableGCI;
+    sendSignal(ref, GSN_START_FRAGREQ, signal, 
+	       StartFragReq::SignalLength, JBB);
+  }
+}
+
+void
+Dbdih::nr_run_redo(Signal* signal, TakeOverRecordPtr takeOverPtr)
+{
+  takeOverPtr.p->toCurrentTabref = 0;
+  takeOverPtr.p->toCurrentFragid = 0;
+  sendSTART_RECREQ(signal, takeOverPtr.p->toStartingNode);
+}
+
 void Dbdih::initStartTakeOver(const StartToReq * req, 
 			      TakeOverRecordPtr takeOverPtr)
 {
@@ -2947,6 +3095,14 @@
     /*---------------------------------------------------------------------- */
     FragmentstorePtr fragPtr;
     getFragstore(tabPtr.p, fragId, fragPtr);
+    Uint32 gci = 0;
+    if (takeOverPtr.p->toNodeRestart)
+    {
+      ReplicaRecordPtr replicaPtr;
+      findReplica(replicaPtr, fragPtr.p, takeOverPtr.p->toStartingNode, true);
+      gci = replicaPtr.p->lcpIdStarted;
+      replicaPtr.p->lcpIdStarted = 0;
+    }
     takeOverPtr.p->toMasterStatus = TakeOverRecord::COPY_FRAG;
     BlockReference ref = calcLqhBlockRef(takeOverPtr.p->toCopyNode);
     CopyFragReq * const copyFragReq = (CopyFragReq *)&signal->theData[0];
@@ -2957,6 +3113,7 @@
     copyFragReq->nodeId = takeOverPtr.p->toStartingNode;
     copyFragReq->schemaVersion = tabPtr.p->schemaVersion;
     copyFragReq->distributionKey = fragPtr.p->distributionKey;
+    copyFragReq->gci = gci;
     sendSignal(ref, GSN_COPY_FRAGREQ, signal, CopyFragReq::SignalLength, JBB);
   } else {
     ndbrequire(takeOverPtr.p->toMasterStatus == TakeOverRecord::COMMIT_CREATE);
@@ -3995,6 +4152,8 @@
 						  Uint32 takeOverPtrI)
 {
   jam();
+  ndbout_c("checkTakeOverInMasterStartNodeFailure %x",
+	   takeOverPtrI);
   if (takeOverPtrI == RNIL) {
     jam();
     return;
@@ -4008,6 +4167,9 @@
   takeOverPtr.i = takeOverPtrI;
   ptrCheckGuard(takeOverPtr, MAX_NDB_NODES, takeOverRecord);
 
+  ndbout_c("takeOverPtr.p->toMasterStatus: %x", 
+	   takeOverPtr.p->toMasterStatus);
+  
   bool ok = false;
   switch (takeOverPtr.p->toMasterStatus) {
   case TakeOverRecord::IDLE:
@@ -4116,6 +4278,13 @@
     //-----------------------------------------------------------------------
     endTakeOver(takeOverPtr.i);
     break;
+
+  case TakeOverRecord::STARTING_LOCAL_FRAGMENTS:
+    ok = true;
+    jam();
+    endTakeOver(takeOverPtr.i);
+    break;
+    
     /**
      * The following are states that it should not be possible to "be" in
      */
@@ -8830,8 +8999,8 @@
     // otherwise we have a problem.
     /* --------------------------------------------------------------------- */
     jam();
-    ndbrequire(senderNodeId == c_nodeStartMaster.startNode);
-    nodeRestartStartRecConfLab(signal);
+    ndbout_c("startNextCopyFragment");
+    startNextCopyFragment(signal, findTakeOver(senderNodeId));
     return;
   } else {
     /* --------------------------------------------------------------------- */
@@ -9850,9 +10019,11 @@
 }
 
 void Dbdih::findReplica(ReplicaRecordPtr& replicaPtr, 
-			Fragmentstore* fragPtrP, Uint32 nodeId)
+			Fragmentstore* fragPtrP, 
+			Uint32 nodeId,
+			bool old)
 {
-  replicaPtr.i = fragPtrP->storedReplicas;
+  replicaPtr.i = old ? fragPtrP->oldStoredReplicas : fragPtrP->storedReplicas;
   while(replicaPtr.i != RNIL){
     ptrCheckGuard(replicaPtr, creplicaFileSize, replicaRecord);
     if (replicaPtr.p->procNode == nodeId) {

--- 1.44/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp	2005-11-28 14:11:57 +01:00
+++ 1.45/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp	2005-12-21 16:41:52 +01:00
@@ -264,15 +264,6 @@
 #define ZSTORED_PROC_SCAN 0
 #define ZSTORED_PROC_COPY 2
 #define ZDELETE_STORED_PROC_ID 3
-//#define ZSCAN_NEXT 1
-//#define ZSCAN_NEXT_COMMIT 2
-//#define ZSCAN_NEXT_ABORT 12
-#define ZCOPY_COMMIT 3
-#define ZCOPY_REPEAT 4
-#define ZCOPY_ABORT 5
-#define ZCOPY_CLOSE 6
-//#define ZSCAN_CLOSE 6
-//#define ZEMPTY_FRAGMENT 0
 #define ZWRITE_LOCK 1
 #define ZSCAN_FRAG_CLOSED 2
 /* ------------------------------------------------------------------------- */
@@ -746,10 +737,16 @@
     FragStatus fragStatus;
 
     /**
-     *       Indicates a local checkpoint is active and thus can generate
-     *       UNDO log records.
-     */
-    UintR fragActiveStatus;
+     * 0 = undefined i.e fragStatus != ACTIVE_CREATION
+     * 1 = yes
+     * 2 = no
+     */
+    enum ActiveCreat {
+      AC_NORMAL = 0,  // fragStatus != ACTIVE_CREATION
+      AC_IGNORED = 1, // Operation that got ignored during NR
+      AC_NR_COPY = 2  // Operation that got performed during NR
+    };
+    Uint8 m_copy_started_state; 
 
     /**
      *       This flag indicates whether logging is currently activated at 
@@ -2102,10 +2099,6 @@
   void execBUILDINDXCONF(Signal*signal);
   
   void execDUMP_STATE_ORD(Signal* signal);
-  void execACC_COM_BLOCK(Signal* signal);
-  void execACC_COM_UNBLOCK(Signal* signal);
-  void execTUP_COM_BLOCK(Signal* signal);
-  void execTUP_COM_UNBLOCK(Signal* signal);
   void execACC_ABORTCONF(Signal* signal);
   void execNODE_FAILREP(Signal* signal);
   void execCHECK_LCP_STOP(Signal* signal);
@@ -2242,7 +2235,7 @@
   Uint32 sendKeyinfo20(Signal* signal, ScanRecord *, TcConnectionrec *);
   void sendScanFragConf(Signal* signal, Uint32 scanCompleted);
   void initCopyrec(Signal* signal);
-  void initCopyTc(Signal* signal);
+  void initCopyTc(Signal* signal, Operation_t);
   void sendCopyActiveConf(Signal* signal,Uint32 tableId);
   void checkLcpCompleted(Signal* signal);
   void checkLcpHoldop(Signal* signal);
@@ -2569,7 +2562,12 @@
 
   void acckeyconf_tupkeyreq(Signal*, TcConnectionrec*, Fragrecord*, Uint32, Uint32);
   void acckeyconf_load_diskpage(Signal*,TcConnectionrecPtr,Fragrecord*,Uint32);
-
+  
+  void handle_nr_copy(Signal*, Ptr<TcConnectionrec>);
+  void exec_acckeyreq(Signal*, Ptr<TcConnectionrec>);
+  int compare_key(const TcConnectionrec*, const Uint32 * ptr, Uint32 len);
+  void nr_copy_delete_row(Signal*, Ptr<TcConnectionrec>, Local_key*, Uint32);
+  
 public:
   void acckeyconf_load_diskpage_callback(Signal*, Uint32, Uint32);
 
@@ -2831,11 +2829,6 @@
   UintR cLqhTimeOutCount;
   UintR cLqhTimeOutCheckCount;
   UintR cnoOfLogPages;
-  bool  caccCommitBlocked;
-  bool  ctupCommitBlocked;
-  bool  cCommitBlocked;
-  UintR cCounterAccCommitBlocked;
-  UintR cCounterTupCommitBlocked;
 /* ------------------------------------------------------------------------- */
 /*THIS VARIABLE CONTAINS MY OWN PROCESSOR ID.                                */
 /* ------------------------------------------------------------------------- */
@@ -2855,16 +2848,11 @@
   Uint16 cpackedList[MAX_NDB_NODES];
   UintR cnodeData[MAX_NDB_NODES];
   UintR cnodeStatus[MAX_NDB_NODES];
-/* ------------------------------------------------------------------------- */
-/*THIS VARIABLE INDICATES WHETHER A CERTAIN NODE HAS SENT ALL FRAGMENTS THAT */
-/*NEED TO HAVE THE LOG EXECUTED.                                             */
-/* ------------------------------------------------------------------------- */
-  Uint8 cnodeSrState[MAX_NDB_NODES];
-/* ------------------------------------------------------------------------- */
-/*THIS VARIABLE INDICATES WHETHER A CERTAIN NODE HAVE EXECUTED THE LOG       */
-/* ------------------------------------------------------------------------- */
-  Uint8 cnodeExecSrState[MAX_NDB_NODES];
   UintR cnoOfNodes;
+
+  NdbNodeBitmask m_sr_nodes;
+  NdbNodeBitmask m_sr_exec_sr_req;
+  NdbNodeBitmask m_sr_exec_sr_conf;
 
 /* ------------------------------------------------------------------------- */
 /* THIS VARIABLE CONTAINS THE DIRECTORY OF A HASH TABLE OF ALL ACTIVE        */

--- 1.91/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2005-12-06 09:24:55 +01:00
+++ 1.92/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2005-12-21 16:41:52 +01:00
@@ -118,57 +118,9 @@
 
 const Uint32 NR_ScanNo = 0;
 
-void Dblqh::execACC_COM_BLOCK(Signal* signal)
-{
-  jamEntry();
-/* ------------------------------------------------------------------------- */
-// Undo log buffer in ACC is in critical sector of being full.
-/* ------------------------------------------------------------------------- */
-  cCounterAccCommitBlocked++;
-  caccCommitBlocked = true;
-  cCommitBlocked = true;
-  return;
-}//Dblqh::execACC_COM_BLOCK()
-
-void Dblqh::execACC_COM_UNBLOCK(Signal* signal)
-{
-  jamEntry();
-/* ------------------------------------------------------------------------- */
-// Undo log buffer in ACC ok again.
-/* ------------------------------------------------------------------------- */
-  caccCommitBlocked = false;
-  if (ctupCommitBlocked == false) {
-    jam();
-    cCommitBlocked = false;
-  }//if
-  return;
-}//Dblqh::execACC_COM_UNBLOCK()
-
-void Dblqh::execTUP_COM_BLOCK(Signal* signal)
-{
-  jamEntry();
-/* ------------------------------------------------------------------------- */
-// Undo log buffer in TUP is in critical sector of being full.
-/* ------------------------------------------------------------------------- */
-  cCounterTupCommitBlocked++;
-  ctupCommitBlocked = true;
-  cCommitBlocked = true;
-  return;
-}//Dblqh::execTUP_COM_BLOCK()
-
-void Dblqh::execTUP_COM_UNBLOCK(Signal* signal)
-{
-  jamEntry();
-/* ------------------------------------------------------------------------- */
-// Undo log buffer in TUP ok again.
-/* ------------------------------------------------------------------------- */
-  ctupCommitBlocked = false;
-  if (caccCommitBlocked == false) {
-    jam();
-    cCommitBlocked = false;
-  }//if
-  return;
-}//Dblqh::execTUP_COM_UNBLOCK()
+static FileOutputStream pkk_fileoutputstream(fopen("pkk.log", "w+"));
+NdbOut pkkout(pkk_fileoutputstream);
+static int PKK = 0;
 
 /* ------------------------------------------------------------------------- */
 /* -------               SEND SYSTEM ERROR                           ------- */
@@ -242,12 +194,6 @@
     ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
     fragptr.i = tcConnectptr.p->fragmentptr;
     c_fragment_pool.getPtr(fragptr);
-    if ((cCommitBlocked == true) &&
-        (fragptr.p->fragActiveStatus == ZTRUE)) {
-      jam();
-      sendSignalWithDelay(cownref, GSN_CONTINUEB, signal, 10, 2);
-      return;
-    }//if
     logPartPtr.p->LogLqhKeyReqSent = ZFALSE;
     getFirstInLogQueue(signal);
 
@@ -289,12 +235,13 @@
       writeCommitLog(signal, logPartPtr);
       logNextStart(signal);
       if (tcConnectptr.p->transactionState == TcConnectionrec::LOG_COMMIT_QUEUED) {
-        if (tcConnectptr.p->seqNoReplica != 0) {
+        if (tcConnectptr.p->seqNoReplica == 0 ||
+	    tcConnectptr.p->activeCreat == Fragrecord::AC_NR_COPY) {
           jam();
-          commitReplyLab(signal);
+          localCommitLab(signal);
         } else {
           jam();
-          localCommitLab(signal);
+          commitReplyLab(signal);
         }//if
         return;
       } else {
@@ -562,6 +509,7 @@
     LqhKeyReq::setSimpleFlag(preComputedRequestInfoMask, 1);
     LqhKeyReq::setOperation(preComputedRequestInfoMask, RI_OPERATION_MASK);
     LqhKeyReq::setGCIFlag(preComputedRequestInfoMask, 1);
+    LqhKeyReq::setNrCopyFlag(preComputedRequestInfoMask, 1);
     // Dont setAIInLqhKeyReq
     // Dont setSeqNoReplica
     // Dont setSameClientAndTcFlag
@@ -813,6 +761,7 @@
     jam();
     if (NodeBitmask::get(readNodes->allNodes, i)) {
       jam();
+      m_sr_nodes.set(i);
       cnodeData[ind]    = i;
       cnodeStatus[ind]  = NodeBitmask::get(readNodes->inactiveNodes, i);
       //readNodes->getVersionId(i, readNodes->theVersionIds) not used
@@ -824,11 +773,23 @@
   ndbrequire(!(cnoOfNodes == 1 && cstartType == NodeState::ST_NODE_RESTART));
   
   caddNodeState = ZFALSE;
-  if (cstartType == NodeState::ST_SYSTEM_RESTART) {
+  if (cstartType == NodeState::ST_SYSTEM_RESTART) 
+  {
     jam();
     sendNdbSttorryLab(signal);
     return;
-  }//if
+  } 
+  else if (cstartType == NodeState::ST_NODE_RESTART)
+  {
+    jam();
+    PKK = 1;
+    m_sr_nodes.clear();
+    m_sr_nodes.set(getOwnNodeId());
+    sendNdbSttorryLab(signal);
+    return;
+  }
+  PKK = 1;
+  
   checkStartCompletedLab(signal);
   return;
 }//Dblqh::execREAD_NODESCONF()
@@ -858,6 +819,7 @@
 {
   cstartPhase = ZNIL;
   cstartType = ZNIL;
+  PKK = 0;
   sendNdbSttorryLab(signal);
   return;
 }//Dblqh::startphase6Lab()
@@ -1394,6 +1356,7 @@
         jam();
         if (! DictTabInfo::isOrderedIndex(addfragptr.p->tableType))
 	{
+	  fragptr.p->m_copy_started_state = Fragrecord::AC_IGNORED;
           fragptr.p->fragStatus = Fragrecord::ACTIVE_CREATION;
 	}
         else
@@ -2031,17 +1994,6 @@
   jamEntry();
   cLqhTimeOutCount++;
   cLqhTimeOutCheckCount++;
-  if ((cCounterAccCommitBlocked > 0) ||
-     (cCounterTupCommitBlocked > 0)) {
-    jam();
-    signal->theData[0] = NDB_LE_UndoLogBlocked;
-    signal->theData[1] = cCounterTupCommitBlocked;
-    signal->theData[2] = cCounterAccCommitBlocked;
-    sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 3, JBB);
-
-    cCounterTupCommitBlocked = 0;
-    cCounterAccCommitBlocked = 0;
-  }//if
   if (cLqhTimeOutCheckCount < 10) {
     jam();
     return;
@@ -2543,6 +2495,7 @@
   switch (tcConnectptr.p->transactionState) {
   case TcConnectionrec::WAIT_TUP:
     jam();
+    ndbrequire(terrorCode != 626);
     if (regTcPtr->activeCreat == ZTRUE && 
 	regTcPtr->operation == ZINSERT &&
 	terrorCode == 899)
@@ -3362,24 +3315,26 @@
     } else {
       nextPos += 4;
     }//if
-  } else {
+  } 
+  else if (! (LqhKeyReq::getNrCopyFlag(Treqinfo)))
+  {
     LQHKEY_error(signal, 3);
     return;
   }//if
-
+  
   sig0 = lqhKeyReq->variableData[nextPos + 0];
   sig1 = lqhKeyReq->variableData[nextPos + 1];
-  sig2 = lqhKeyReq->variableData[nextPos + 2];
-  sig3 = cnewestGci;
   regTcPtr->m_row_id.m_page_no = sig0;
   regTcPtr->m_row_id.m_page_idx = sig1;
-  regTcPtr->gci = LqhKeyReq::getGCIFlag(Treqinfo) ? sig2 : sig3;
   nextPos += 2 * LqhKeyReq::getRowidFlag(Treqinfo);
-  nextPos += LqhKeyReq::getGCIFlag(Treqinfo);
 
+  sig2 = lqhKeyReq->variableData[nextPos + 0];
+  sig3 = cnewestGci;
+  regTcPtr->gci = LqhKeyReq::getGCIFlag(Treqinfo) ? sig2 : sig3;
+  nextPos += LqhKeyReq::getGCIFlag(Treqinfo);
+  
   if (LqhKeyReq::getRowidFlag(Treqinfo))
   {
-    ndbassert(op == ZINSERT);
     ndbassert(refToBlock(senderRef) != DBTC);
   }
   else if(op == ZINSERT)
@@ -3454,15 +3409,28 @@
     return;
   }//if
 
+  if (LqhKeyReq::getNrCopyFlag(Treqinfo))
+  {
+    ndbassert(refToBlock(senderRef) == DBLQH);
+    ndbassert(LqhKeyReq::getRowidFlag(Treqinfo));
+    if (! (fragptr.p->fragStatus == Fragrecord::ACTIVE_CREATION))
+    {
+      ndbout_c("fragptr.p->fragStatus: %d",
+	       fragptr.p->fragStatus);
+    }
+    ndbassert(fragptr.p->fragStatus == Fragrecord::ACTIVE_CREATION);
+    fragptr.p->m_copy_started_state = Fragrecord::AC_NR_COPY;
+  }
+  
   Uint8 TcopyType = fragptr.p->fragCopy;
   Uint32 logPart = fragptr.p->m_log_part_ptr_i;
   tfragDistKey = fragptr.p->fragDistributionKey;
   if (fragptr.p->fragStatus == Fragrecord::ACTIVE_CREATION) {
     jam();
-    regTcPtr->activeCreat = ZTRUE;
+    regTcPtr->activeCreat = fragptr.p->m_copy_started_state;
     CRASH_INSERTION(5002);
   } else {
-    regTcPtr->activeCreat = ZFALSE;
+    regTcPtr->activeCreat = Fragrecord::AC_NORMAL;
   }//if
   regTcPtr->replicaType = TcopyType;
   regTcPtr->fragmentptr = fragptr.i;
@@ -3664,7 +3632,6 @@
 void Dblqh::prepareContinueAfterBlockedLab(Signal* signal) 
 {
   UintR ttcScanOp;
-  UintR taccreq;
 
 /* -------------------------------------------------------------------------- */
 /*       INPUT:          TC_CONNECTPTR           ACTIVE CONNECTION RECORD     */
@@ -3677,6 +3644,7 @@
 /* -------------------------------------------------------------------------- */
   Uint32 tc_ptr_i = tcConnectptr.i;
   TcConnectionrec * const regTcPtr = tcConnectptr.p;
+  Uint32 activeCreat = regTcPtr->activeCreat;
   if (regTcPtr->indTakeOver == ZTRUE) {
     jam();
     ttcScanOp = KeyInfo20::getScanOp(regTcPtr->tcScanInfo);
@@ -3725,36 +3693,75 @@
 /*       BEFORE SENDING THE MESSAGE THE REQUEST INFORMATION IS SET   */
 /*       PROPERLY.                                                   */
 /* ----------------------------------------------------------------- */
-#if 0
-  if (regTcPtr->tableref != 0) {
+  if (PKK)
+  {
     switch (regTcPtr->operation) {
-    case ZREAD: ndbout << "Läsning "; break;
-    case ZUPDATE: ndbout << " Uppdatering "; break;
-    case ZWRITE: ndbout << "Write "; break;
-    case ZINSERT: ndbout << "Inläggning "; break;
-    case ZDELETE: ndbout << "Borttagning "; break;
-    default: ndbout << "????"; break;
-    }
-    ndbout << "med nyckel = " << regTcPtr->tupkeyData[0] << endl;
+    case ZREAD: pkkout << "READ"; break;
+    case ZUPDATE: pkkout << "UPDATE"; break;
+    case ZWRITE: pkkout << "WRITE"; break;
+    case ZINSERT: pkkout << "INSERT"; break;
+    case ZDELETE: pkkout << "DELETE"; break;
+    default: pkkout << "<Unknown: " << regTcPtr->operation << ">"; break;
+    }
+    
+    pkkout << " tab: " << regTcPtr->tableref 
+	   << " frag: " << regTcPtr->fragmentid
+	   << " activeCreat: " << (Uint32)activeCreat;
+    if (LqhKeyReq::getNrCopyFlag(regTcPtr->reqinfo))
+      pkkout << " NrCopy";
+    if (LqhKeyReq::getRowidFlag(regTcPtr->reqinfo))
+      pkkout << " rowid: " << regTcPtr->m_row_id;
+    pkkout << " key: " << regTcPtr->tupkeyData[0];
   }
-#endif
   
-  regTcPtr->transactionState = TcConnectionrec::WAIT_ACC;
-  taccreq = regTcPtr->operation;
-  taccreq = taccreq + (regTcPtr->opSimple << 3);
-  taccreq = taccreq + (regTcPtr->lockType << 4);
-  taccreq = taccreq + (regTcPtr->dirtyOp << 6);
-  taccreq = taccreq + (regTcPtr->replicaType << 7);
-  taccreq = taccreq + (regTcPtr->apiVersionNo << 9);
+  if (likely(activeCreat == Fragrecord::AC_NORMAL))
+  {
+    if (PKK)
+      pkkout << endl;
+    ndbassert(!LqhKeyReq::getNrCopyFlag(regTcPtr->reqinfo));
+    exec_acckeyreq(signal, tcConnectptr);
+  } 
+  else if (activeCreat == Fragrecord::AC_NR_COPY)
+  {
+    handle_nr_copy(signal, tcConnectptr);
+  }
+  else
+  {
+    ndbassert(activeCreat == Fragrecord::AC_IGNORED);
+    if (PKK)
+      pkkout << " IGNORING (activeCreat == 2)" << endl;
+    
+    signal->theData[0] = tc_ptr_i;
+    regTcPtr->transactionState = TcConnectionrec::WAIT_ACC_ABORT;
+    
+    signal->theData[0] = regTcPtr->tupConnectrec;
+    EXECUTE_DIRECT(DBTUP, GSN_TUP_ABORTREQ, signal, 1);
+    jamEntry();
+    
+    execACC_ABORTCONF(signal);
+  }
+}
+
+void
+Dblqh::exec_acckeyreq(Signal* signal, TcConnectionrecPtr regTcPtr)
+{
+  Uint32 taccreq;
+  regTcPtr.p->transactionState = TcConnectionrec::WAIT_ACC;
+  taccreq = regTcPtr.p->operation;
+  taccreq = taccreq + (regTcPtr.p->opSimple << 3);
+  taccreq = taccreq + (regTcPtr.p->lockType << 4);
+  taccreq = taccreq + (regTcPtr.p->dirtyOp << 6);
+  taccreq = taccreq + (regTcPtr.p->replicaType << 7);
+  taccreq = taccreq + (regTcPtr.p->apiVersionNo << 9);
 /* ************ */
 /*  ACCKEYREQ < */
 /* ************ */
   Uint32 sig0, sig1, sig2, sig3, sig4;
-  sig0 = regTcPtr->accConnectrec;
+  sig0 = regTcPtr.p->accConnectrec;
   sig1 = fragptr.p->accFragptr;
-  sig2 = regTcPtr->hashValue;
-  sig3 = regTcPtr->primKeyLen;
-  sig4 = regTcPtr->transid[0];
+  sig2 = regTcPtr.p->hashValue;
+  sig3 = regTcPtr.p->primKeyLen;
+  sig4 = regTcPtr.p->transid[0];
   signal->theData[0] = sig0;
   signal->theData[1] = sig1;
   signal->theData[2] = taccreq;
@@ -3762,39 +3769,338 @@
   signal->theData[4] = sig3;
   signal->theData[5] = sig4;
 
-  sig0 = regTcPtr->transid[1];
-  sig1 = regTcPtr->tupkeyData[0];
-  sig2 = regTcPtr->tupkeyData[1];
-  sig3 = regTcPtr->tupkeyData[2];
-  sig4 = regTcPtr->tupkeyData[3];
+  sig0 = regTcPtr.p->transid[1];
+  sig1 = regTcPtr.p->tupkeyData[0];
+  sig2 = regTcPtr.p->tupkeyData[1];
+  sig3 = regTcPtr.p->tupkeyData[2];
+  sig4 = regTcPtr.p->tupkeyData[3];
   signal->theData[6] = sig0;
   signal->theData[7] = sig1;
   signal->theData[8] = sig2;
   signal->theData[9] = sig3;
   signal->theData[10] = sig4;
-  if (regTcPtr->primKeyLen > 4) {
+  if (regTcPtr.p->primKeyLen > 4) {
     sendKeyinfoAcc(signal, 11);
   }//if
-  EXECUTE_DIRECT(refToBlock(regTcPtr->tcAccBlockref), GSN_ACCKEYREQ, 
-		 signal, 7 + regTcPtr->primKeyLen);
+  EXECUTE_DIRECT(refToBlock(regTcPtr.p->tcAccBlockref), GSN_ACCKEYREQ, 
+		 signal, 7 + regTcPtr.p->primKeyLen);
   if (signal->theData[0] < RNIL) {
-    signal->theData[0] = tc_ptr_i;
+    signal->theData[0] = regTcPtr.i;
     execACCKEYCONF(signal);
     return;
   } else if (signal->theData[0] == RNIL) {
     ;
   } else {
     ndbrequire(signal->theData[0] == (UintR)-1);
-    signal->theData[0] = tc_ptr_i;
+    signal->theData[0] = regTcPtr.i;
     execACCKEYREF(signal);
   }//if
   return;
 }//Dblqh::prepareContinueAfterBlockedLab()
 
-/* ========================================================================== */
-/* =======                  SEND KEYINFO TO ACC                       ======= */
-/*                                                                            */
-/* ========================================================================== */
+void
+Dblqh::handle_nr_copy(Signal* signal, Ptr<TcConnectionrec> regTcPtr)
+{
+  jam();
+  Uint32 tableId = regTcPtr.p->tableref;
+  Uint32 fragPtr = fragptr.p->tupFragptr;
+  Uint32 op = regTcPtr.p->operation;
+
+  const bool copy = LqhKeyReq::getNrCopyFlag(regTcPtr.p->reqinfo);
+
+  if (!LqhKeyReq::getRowidFlag(regTcPtr.p->reqinfo))
+  {
+    /**
+     * Rowid not set, that mean that primary has finished copying...
+     */
+    jam();
+    if (PKK)
+      pkkout << " Waiting for COPY_ACTIVEREQ" << endl;
+    ndbassert(!LqhKeyReq::getNrCopyFlag(regTcPtr.p->reqinfo));
+    regTcPtr.p->activeCreat = Fragrecord::AC_NORMAL;
+    exec_acckeyreq(signal, regTcPtr);
+    return;
+  }
+
+  Uint32* dst = signal->theData+24;
+  bool uncommitted;
+  const int len = c_tup->nr_read_pk(fragPtr, &regTcPtr.p->m_row_id, dst, 
+				    uncommitted);
+  const bool match = (len>0) ? compare_key(regTcPtr.p, dst, len) == 0 : false;
+  
+  if (PKK)
+    pkkout << " len: " << len << " match: " << match 
+	   << " uncommitted: " << uncommitted;
+
+  if (copy)
+  {
+    ndbassert(LqhKeyReq::getGCIFlag(regTcPtr.p->reqinfo));
+    if (match)
+    {
+      /**
+       * Case 1
+       */
+      jam();
+      ndbassert(op == ZINSERT);
+      if (PKK)
+	pkkout << " Changing from INSERT to ZUPDATE" << endl;
+      regTcPtr.p->operation = ZUPDATE;
+      goto run;
+    }
+    else if (len > 0 && op == ZDELETE)
+    {
+      /**
+       * Case 4
+       *   Perform delete using rowid
+       *     primKeyLen == 0
+       *     tupkeyData[0] == rowid
+       */
+      jam();
+      ndbassert(regTcPtr.p->primKeyLen == 0);
+      if (PKK)
+	pkkout << " performing DELETE key: " 
+	       << dst[0] << endl; 
+      regTcPtr.p->tupkeyData[0] = regTcPtr.p->m_row_id.ref();
+      if (g_key_descriptor_pool.getPtr(tableId)->hasCharAttr)
+      {
+	regTcPtr.p->hashValue = calculateHash(tableId, dst);
+      }
+      else
+      {
+	regTcPtr.p->hashValue = md5_hash((Uint64*)dst, len);
+      }
+      goto run;
+    }
+    else if (len == 0 && op == ZDELETE)
+    {
+      /**
+       * Case 7
+       */
+      jam();
+      if (PKK)
+	pkkout << " UPDATE_GCI" << endl; 
+      c_tup->nr_update_gci(fragPtr, &regTcPtr.p->m_row_id, regTcPtr.p->gci);
+      goto update_gci_ignore;
+    }
+    
+    /**
+     * 1) Delete row at specified rowid (if len > 0)
+     * 2) Delete specified row at different rowid (if exists)
+     * 3) Run insert
+     */
+    if (len > 0)
+    {
+      /**
+       * 1) Delete row at specified rowid (if len > 0)
+       */
+      jam();
+      nr_copy_delete_row(signal, regTcPtr, &regTcPtr.p->m_row_id, len);
+    }
+    /**
+     * 2) Delete specified row at different rowid (if exists)    
+     */
+    jam();
+    nr_copy_delete_row(signal, regTcPtr, 0, 0);
+    if (PKK)
+      pkkout << " RUN INSERT" << endl; 
+    goto run;
+  }
+  else
+  {
+    if (!match && op != ZINSERT)
+    {
+      jam();
+      if (PKK)
+	pkkout << " IGNORE " << endl; 
+      goto ignore;
+    }
+    if (match)
+    {
+      jam();
+      if (op != ZDELETE)
+      {
+	if (PKK)
+	  pkkout << " Changing from to ZWRITE" << endl;
+	regTcPtr.p->operation = ZWRITE;
+      }
+      goto run;
+    }
+    
+    /**
+     * 1) Delete row at specified rowid (if len > 0)
+     * 2) Delete specified row at different rowid (if exists)
+     * 3) Run insert
+     */
+    if (len > 0)
+    {
+      /**
+       * 1) Delete row at specified rowid (if len > 0)
+       */
+      jam();
+      nr_copy_delete_row(signal, regTcPtr, &regTcPtr.p->m_row_id, len);
+    }
+
+    /**
+     * 2) Delete specified row at different rowid (if exists)    
+     */
+    jam();
+    nr_copy_delete_row(signal, regTcPtr, 0, 0);
+    if (PKK)
+      pkkout << " RUN op: " << op << endl; 
+    goto run;
+  }
+  
+run:
+  jam();
+  exec_acckeyreq(signal, regTcPtr);
+  return;
+  
+ignore:
+  jam();
+  ndbassert(!LqhKeyReq::getNrCopyFlag(regTcPtr.p->reqinfo));
+update_gci_ignore:
+  regTcPtr.p->activeCreat = Fragrecord::AC_IGNORED;
+  signal->theData[0] = regTcPtr.p->tupConnectrec;
+  EXECUTE_DIRECT(DBTUP, GSN_TUP_ABORTREQ, signal, 1);
+
+  regTcPtr.p->transactionState = TcConnectionrec::WAIT_ACC_ABORT;
+  signal->theData[0] = regTcPtr.i;
+  execACC_ABORTCONF(signal);
+}
+
+int
+Dblqh::compare_key(const TcConnectionrec* regTcPtr, 
+		   const Uint32 * ptr, Uint32 len)
+{
+  if (regTcPtr->primKeyLen != len)
+    return 1;
+  
+  if (len <= 4)
+    return memcmp(ptr, regTcPtr->tupkeyData, 4*len);
+  
+  if (memcmp(ptr, regTcPtr->tupkeyData, sizeof(regTcPtr->tupkeyData)))
+    return 1;
+  
+  len -= (sizeof(regTcPtr->tupkeyData) >> 2);
+  ptr += (sizeof(regTcPtr->tupkeyData) >> 2);
+
+  DatabufPtr regDatabufptr;
+  regDatabufptr.i = tcConnectptr.p->firstTupkeybuf;
+  ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf);
+  while(len > 4)
+  {
+    if (memcmp(ptr, regDatabufptr.p, 4*4))
+      return 1;
+
+    ptr += 4;
+    len -= 4;
+    regDatabufptr.i = regDatabufptr.p->nextDatabuf;
+    ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf);    
+  }
+
+  if (memcmp(ptr, regDatabufptr.p, 4*len))
+    return 1;
+
+  return 0;
+}
+
+void
+Dblqh::nr_copy_delete_row(Signal* signal, 
+			  Ptr<TcConnectionrec> regTcPtr,
+			  Local_key* rowid, Uint32 len)
+{
+  Ptr<Fragrecord> fragPtr = fragptr;
+
+  Uint32 keylen;
+  Uint32 tableId = regTcPtr.p->tableref;
+  Uint32 accPtr = regTcPtr.p->accConnectrec;
+  
+  signal->theData[0] = accPtr;
+  signal->theData[1] = fragptr.p->accFragptr;
+  signal->theData[2] = ZDELETE + (ZDELETE << 4);
+  signal->theData[5] = regTcPtr.p->transid[0];
+  signal->theData[6] = regTcPtr.p->transid[1];
+  
+  if (rowid)
+  {
+    jam();
+    keylen = 1;
+    if (g_key_descriptor_pool.getPtr(tableId)->hasCharAttr)
+    {
+      signal->theData[3] = calculateHash(tableId, signal->theData+24);
+    }
+    else
+    {
+      signal->theData[3] = md5_hash((Uint64*)(signal->theData+24), len);
+    }
+    signal->theData[4] = 0; // seach by local key
+    signal->theData[7] = rowid->ref();
+  }
+  else
+  {
+    jam();
+    keylen = regTcPtr.p->primKeyLen;
+    signal->theData[3] = regTcPtr.p->hashValue;
+    signal->theData[4] = keylen;
+    signal->theData[7] = regTcPtr.p->tupkeyData[0];
+    signal->theData[8] = regTcPtr.p->tupkeyData[1];
+    signal->theData[9] = regTcPtr.p->tupkeyData[2];
+    signal->theData[10] = regTcPtr.p->tupkeyData[3];
+    if (keylen > 4)
+      sendKeyinfoAcc(signal, 11);
+  }
+  const Uint32 ref = refToBlock(regTcPtr.p->tcAccBlockref);
+  EXECUTE_DIRECT(ref, GSN_ACCKEYREQ, signal, 7 + keylen);
+  jamEntry();
+
+  Uint32 retValue = signal->theData[0];
+  ndbrequire(retValue != RNIL); // This should never block...
+  ndbrequire(retValue != (Uint32)-1 || rowid == 0); // rowid should never fail
+
+  if (retValue == (Uint32)-1)
+  {
+    /**
+     * Only delete by pk, may fail
+     */
+    jam();
+    ndbrequire(rowid == 0);
+    signal->theData[0] = accPtr;
+    signal->theData[1] = false;
+    EXECUTE_DIRECT(ref, GSN_ACC_ABORTREQ, signal, 2);
+    jamEntry();
+    return;
+  }
+
+  /**
+   * We found row (and have it locked in ACC)
+   */
+  ndbrequire(regTcPtr.p->m_dealloc == 0);
+  Local_key save = regTcPtr.p->m_row_id;
+  signal->theData[0] = regTcPtr.p->accConnectrec;
+  EXECUTE_DIRECT(ref, GSN_ACC_COMMITREQ, signal, 1);
+  jamEntry();
+
+  ndbrequire(regTcPtr.p->m_dealloc == 1);  
+  signal->theData[0] = regTcPtr.p->fragmentid;
+  signal->theData[1] = regTcPtr.p->tableref;
+  signal->theData[2] = regTcPtr.p->m_row_id.m_page_no;
+  signal->theData[3] = regTcPtr.p->m_row_id.m_page_idx;
+  signal->theData[4] = RNIL;
+  EXECUTE_DIRECT(DBTUP, GSN_TUP_DEALLOCREQ, signal, 5);
+  jamEntry();
+
+  pkkout << "DELETED: " << regTcPtr.p->m_row_id << endl;
+
+  regTcPtr.p->m_dealloc = 0;
+  regTcPtr.p->m_row_id = save;
+  fragptr = fragPtr;
+  tcConnectptr = regTcPtr;
+}
+
+/* =*======================================================================= */
+/* =======                 SEND KEYINFO TO ACC                       ======= */
+/*                                                                           */
+/* ========================================================================= */
 void Dblqh::sendKeyinfoAcc(Signal* signal, Uint32 Ti) 
 {
   DatabufPtr regDatabufptr;
@@ -3842,6 +4148,15 @@
   jamEntry();
   regTcPtr.i = signal->theData[4];
   
+  if (PKK)
+  {
+    Local_key tmp;
+    tmp.m_page_no = signal->theData[2];
+    tmp.m_page_idx = signal->theData[3];
+    pkkout << "TUP_DEALLOC: " << tmp << 
+      (signal->theData[5] ? " DIRECT " : " DELAYED") << endl;
+  }
+  
   if (signal->theData[5])
   {
     jam();
@@ -3884,7 +4199,6 @@
   }//if
 
   // reset the activeCreat since that is only valid in cases where the record was \
                not present.
-  regTcPtr->activeCreat = ZFALSE;
   /* ------------------------------------------------------------------------
    * IT IS NOW TIME TO CONTACT THE TUPLE MANAGER. THE TUPLE MANAGER NEEDS THE
    * INFORMATION ON WHICH TABLE AND FRAGMENT, THE LOCAL KEY AND IT NEEDS TO
@@ -3895,21 +4209,28 @@
    * ----------------------------------------------------------------------- */
   if (regTcPtr->operation == ZWRITE) 
   {
-    ndbassert(regTcPtr->seqNoReplica == 0);
+    ndbassert(regTcPtr->seqNoReplica == 0 || 
+	      regTcPtr->activeCreat == Fragrecord::AC_NR_COPY);
     Uint32 op= signal->theData[1];
     Uint32 requestInfo = regTcPtr->reqinfo;
     if(likely(op == ZINSERT || op == ZUPDATE))
     {
+      jam();
       regTcPtr->operation = op;
     }
     else
     {
+      jam();
       warningEvent("Convering %d to ZUPDATE", op);
       op = regTcPtr->operation = ZUPDATE;
     }
-    requestInfo &= ~(RI_OPERATION_MASK <<  RI_OPERATION_SHIFT);
-    LqhKeyReq::setOperation(requestInfo, op);
-    regTcPtr->reqinfo = requestInfo;
+    if (regTcPtr->seqNoReplica == 0)
+    {
+      jam();
+      requestInfo &= ~(RI_OPERATION_MASK <<  RI_OPERATION_SHIFT);
+      LqhKeyReq::setOperation(requestInfo, op);
+      regTcPtr->reqinfo = requestInfo;
+    }
   }//if
   
   /* ------------------------------------------------------------------------
@@ -4587,12 +4908,14 @@
   }
   else
   {
+    regTcPtr->m_use_rowid |= 
+      fragptr.p->m_copy_started_state == Fragrecord::AC_NR_COPY;
     LqhKeyReq::setRowidFlag(Treqinfo, regTcPtr->m_use_rowid);
   }
 
   if (LqhKeyReq::getRowidFlag(Treqinfo))
   {
-    ndbassert(LqhKeyReq::getOperation(Treqinfo) == ZINSERT);
+    //ndbassert(LqhKeyReq::getOperation(Treqinfo) == ZINSERT);
   }
   else
   {
@@ -4673,11 +4996,13 @@
   sig0 = regTcPtr->gci;
   Local_key tmp = regTcPtr->m_row_id;
   
-  lqhKeyReq->variableData[nextPos] = tmp.m_page_no;
+  lqhKeyReq->variableData[nextPos + 0] = tmp.m_page_no;
   lqhKeyReq->variableData[nextPos + 1] = tmp.m_page_idx;
-  lqhKeyReq->variableData[nextPos + 2] = sig0;
   nextPos += 2*LqhKeyReq::getRowidFlag(Treqinfo);
+
+  lqhKeyReq->variableData[nextPos + 0] = sig0;
   nextPos += LqhKeyReq::getGCIFlag(Treqinfo);
+
   sig0 = regTcPtr->firstAttrinfo[0];
   sig1 = regTcPtr->firstAttrinfo[1];
   sig2 = regTcPtr->firstAttrinfo[2];
@@ -5375,15 +5700,24 @@
   if ((tcConnectptr.p->transactionState == TcConnectionrec::COMMITTED) &&
       (tcConnectptr.p->transid[0] == transid1) &&
       (tcConnectptr.p->transid[1] == transid2)) {
-    if (tcConnectptr.p->seqNoReplica != 0) {
+    if (tcConnectptr.p->seqNoReplica != 0 && 
+	tcConnectptr.p->activeCreat == Fragrecord::AC_NORMAL) {
       jam();
       localCommitLab(signal);
       return;
-    } else {
+    } 
+    else if (tcConnectptr.p->seqNoReplica == 0)
+    {
       jam();
       completeTransLastLab(signal);
       return;
-    }//if
+    }
+    else
+    {
+      jam();
+      completeTransNotLastLab(signal);
+      return;
+    }
   }//if
   if (tcConnectptr.p->transactionState != TcConnectionrec::COMMITTED) {
     warningReport(signal, 2);
@@ -5461,15 +5795,21 @@
     return;
     break;
   }//switch
-  if (regTcPtr->seqNoReplica != 0) {
+  if (regTcPtr->seqNoReplica != 0 && 
+      regTcPtr->activeCreat != Fragrecord::AC_NR_COPY) {
     jam();
     localCommitLab(signal);
-    return;
-  } else {
+  } 
+  else if (regTcPtr->seqNoReplica == 0)
+  {
     jam();
     completeTransLastLab(signal);
-    return;
-  }//if
+  }
+  else
+  {
+    jam();
+    completeTransNotLastLab(signal);
+  }
 }//Dblqh::execCOMPLETEREQ()
 
 /* ************> */
@@ -5566,12 +5906,13 @@
     warningReport(signal, 0);
     return;
   }//if
-  if (regTcPtr->seqNoReplica != 0) {
+  if (regTcPtr->seqNoReplica == 0 ||
+      regTcPtr->activeCreat == Fragrecord::AC_NR_COPY) {
     jam();
-    commitReplyLab(signal);
+    localCommitLab(signal);
     return;
   }//if
-  localCommitLab(signal);
+  commitReplyLab(signal);
   return;
 }//Dblqh::commitReqLab()
 
@@ -5694,32 +6035,7 @@
   Uint32 operation = regTcPtr.p->operation;
   Uint32 simpleRead = regTcPtr.p->simpleRead;
   Uint32 dirtyOp = regTcPtr.p->dirtyOp;
-  if (regTcPtr.p->activeCreat == ZFALSE) {
-    if ((cCommitBlocked == true) &&
-        (regFragptr.p->fragActiveStatus == ZTRUE)) {
-      jam();
-/* ------------------------------------------------------------------------- */
-// TUP and/or ACC have problems in writing the undo log to disk fast enough.
-// We must avoid the commit at this time and try later instead. The fragment
-// is also active with a local checkpoint and this commit can generate UNDO
-// log records that overflow the UNDO log buffer.
-/* ------------------------------------------------------------------------- */
-/*---------------------------------------------------------------------------*/
-// We must delay the write of commit info to the log to safe-guard against
-// a crash due to lack of log pages. We temporary stop all log writes to this
-// log part to ensure that we don't get a buffer explosion in the delayed
-// signal buffer instead.
-/*---------------------------------------------------------------------------*/
-      logPartPtr.i = regTcPtr.p->m_log_part_ptr_i;
-      ptrCheckGuard(logPartPtr, clogPartFileSize, logPartRecord);
-      linkWaitLog(signal, logPartPtr);
-      regTcPtr.p->transactionState = TcConnectionrec::COMMIT_QUEUED;
-      if (logPartPtr.p->logPartState == LogPartRecord::IDLE) {
-        jam();
-        logPartPtr.p->logPartState = LogPartRecord::ACTIVE;
-      }//if
-      return;
-    }//if
+  if (regTcPtr.p->activeCreat != Fragrecord::AC_IGNORED) {
     if (operation != ZREAD) {
       TupCommitReq * const tupCommitReq = 
         (TupCommitReq *)signal->getDataPtrSend();
@@ -5738,9 +6054,32 @@
 	return; // TUP_COMMIT was timesliced
       }
       
+      if (PKK)
+      {
+	pkkout << "COMMIT: ";
+	switch (regTcPtr.p->operation) {
+	case ZREAD: pkkout << "READ"; break;
+	case ZUPDATE: pkkout << "UPDATE"; break;
+	case ZWRITE: pkkout << "WRITE"; break;
+	case ZINSERT: pkkout << "INSERT"; break;
+	case ZDELETE: pkkout << "DELETE"; break;
+	}
+
+	pkkout << " tab: " << regTcPtr.p->tableref 
+	       << " frag: " << regTcPtr.p->fragmentid
+	       << " activeCreat: " << (Uint32)regTcPtr.p->activeCreat;
+	if (LqhKeyReq::getNrCopyFlag(regTcPtr.p->reqinfo))
+	  pkkout << " NrCopy";
+	if (LqhKeyReq::getRowidFlag(regTcPtr.p->reqinfo))
+	  pkkout << " rowid: " << regTcPtr.p->m_row_id;
+	pkkout << " key: " << regTcPtr.p->tupkeyData[0];
+	pkkout << endl;
+      }
+
       Uint32 acc = refToBlock(regTcPtr.p->tcAccBlockref);
       signal->theData[0] = regTcPtr.p->accConnectrec;
       EXECUTE_DIRECT(acc, GSN_ACC_COMMITREQ, signal, 1);
+      
     } else {
       if(!dirtyOp){
 	Uint32 acc = refToBlock(regTcPtr.p->tcAccBlockref);
@@ -5777,7 +6116,7 @@
 
   tcConnectptr.i = tcPtrI;
   ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
-  TcConnectionrec * const tcPtr = tcConnectptr.p;
+  TcConnectionrec * tcPtr = tcConnectptr.p;
 
   ndbrequire(tcPtr->transactionState == TcConnectionrec::WAIT_TUP_COMMIT);
 
@@ -5796,26 +6135,38 @@
 
 void
 Dblqh::tupcommit_conf(Signal* signal, 
-		      TcConnectionrec * regTcPtr,
+		      TcConnectionrec * tcPtrP,
 		      Fragrecord * regFragptr)
 {
-  Uint32 dirtyOp = regTcPtr->dirtyOp;
-  Uint32 seqNoReplica = regTcPtr->seqNoReplica;
-  if (regTcPtr->gci > regFragptr->newestGci) {
+  Uint32 dirtyOp = tcPtrP->dirtyOp;
+  Uint32 seqNoReplica = tcPtrP->seqNoReplica;
+  Uint32 activeCreat = tcPtrP->activeCreat;
+  if (tcPtrP->gci > regFragptr->newestGci) {
     jam();
 /* ------------------------------------------------------------------------- */
 /*IT IS THE FIRST TIME THIS GLOBAL CHECKPOINT IS INVOLVED IN UPDATING THIS   */
 /*FRAGMENT. UPDATE THE VARIABLE THAT KEEPS TRACK OF NEWEST GCI IN FRAGMENT   */
 /* ------------------------------------------------------------------------- */
-    regFragptr->newestGci = regTcPtr->gci;
+    regFragptr->newestGci = tcPtrP->gci;
   }//if
-  if (dirtyOp != ZTRUE) {
-    if (seqNoReplica != 0) {
+  if (dirtyOp != ZTRUE) 
+  {
+    if (seqNoReplica == 0 || activeCreat == Fragrecord::AC_NR_COPY)
+    {
       jam();
-      completeTransNotLastLab(signal);
+      commitReplyLab(signal);
       return;
     }//if
-    commitReplyLab(signal);
+    if (seqNoReplica == 0)
+    {
+      jam();
+      completeTransLastLab(signal);
+    }
+    else
+    {      
+      jam();
+      completeTransNotLastLab(signal);
+    }
     return;
   } else {
 /* ------------------------------------------------------------------------- */
@@ -5823,11 +6174,11 @@
 /*SEND ANY COMMIT OR COMPLETE MESSAGES TO OTHER NODES. THEY WILL MERELY SEND */
 /*THOSE SIGNALS INTERNALLY.                                                  */
 /* ------------------------------------------------------------------------- */
-    if (regTcPtr->abortState == TcConnectionrec::ABORT_IDLE) {
+    if (tcPtrP->abortState == TcConnectionrec::ABORT_IDLE) {
       jam();
       packLqhkeyreqLab(signal);
     } else {
-      ndbrequire(regTcPtr->abortState != TcConnectionrec::NEW_FROM_TC);
+      ndbrequire(tcPtrP->abortState != TcConnectionrec::NEW_FROM_TC);
       jam();
       sendLqhTransconf(signal, LqhTransConf::Committed);
       cleanUp(signal);
@@ -6046,7 +6397,7 @@
     sendSignal(TLqhRef, GSN_ABORT, signal, 4, JBB);
   }//if
   regTcPtr->abortState = TcConnectionrec::ABORT_FROM_TC;
-  regTcPtr->activeCreat = ZFALSE;
+  regTcPtr->activeCreat = Fragrecord::AC_NORMAL;
 
   const Uint32 commitAckMarker = regTcPtr->commitAckMarker;
   if(commitAckMarker != RNIL){
@@ -6106,7 +6457,7 @@
   regTcPtr->reqBlockref = reqBlockref;
   regTcPtr->reqRef = reqPtr;
   regTcPtr->abortState = TcConnectionrec::REQ_FROM_TC;
-  regTcPtr->activeCreat = ZFALSE;
+  regTcPtr->activeCreat = Fragrecord::AC_NORMAL;
   abortCommonLab(signal);
   return;
 }//Dblqh::execABORTREQ()
@@ -6151,51 +6502,49 @@
   }//switch
   const Uint32 errCode = terrorCode; 
   tcPtr->errorCode = errCode;
-/* ------------------------------------------------------------------------- */
-/*WHEN AN ABORT FROM TC ARRIVES IT COULD ACTUALLY BE A CORRECT BEHAVIOUR     */
-/*SINCE THE TUPLE MIGHT NOT HAVE ARRIVED YET OR ALREADY HAVE BEEN INSERTED.  */
-/* ------------------------------------------------------------------------- */
-  if (tcPtr->activeCreat == ZTRUE) {
-    jam();
-/* ------------------------------------------------------------------------- */
-/*THIS IS A NORMAL EVENT DURING CREATION OF A FRAGMENT. PERFORM ABORT IN     */
-/*TUP AND ACC AND THEN CONTINUE WITH NORMAL COMMIT PROCESSING. IF THE ERROR  */
-/*HAPPENS TO BE A SERIOUS ERROR THEN PERFORM ABORT PROCESSING AS NORMAL.     */
-/* ------------------------------------------------------------------------- */
+
+  if (PKK)
+  {
+    pkkout << "ACCKEYREF: " << errCode << " ";
     switch (tcPtr->operation) {
-    case ZUPDATE:
-    case ZDELETE:
-      jam();
-      if (errCode != ZNO_TUPLE_FOUND) {
-        jam();
-/* ------------------------------------------------------------------------- */
-/*A NORMAL ERROR WILL BE TREATED AS A NORMAL ABORT AND WILL ABORT THE        */
-/*TRANSACTION. NO SPECIAL HANDLING IS NEEDED.                                */
-/* ------------------------------------------------------------------------- */
-        tcPtr->activeCreat = ZFALSE;
-      }//if
+    case ZREAD: pkkout << "READ"; break;
+    case ZUPDATE: pkkout << "UPDATE"; break;
+    case ZWRITE: pkkout << "WRITE"; break;
+    case ZINSERT: pkkout << "INSERT"; break;
+    case ZDELETE: pkkout << "DELETE"; break;
+    default: pkkout << "<Unknown: " << tcPtr->operation << ">"; break;
+    }
+    
+    pkkout << " tab: " << tcPtr->tableref 
+	   << " frag: " << tcPtr->fragmentid
+	   << " activeCreat: " << (Uint32)tcPtr->activeCreat;
+    if (LqhKeyReq::getNrCopyFlag(tcPtr->reqinfo))
+      pkkout << " NrCopy";
+    if (LqhKeyReq::getRowidFlag(tcPtr->reqinfo))
+      pkkout << " rowid: " << tcPtr->m_row_id;
+    pkkout << " key: " << tcPtr->tupkeyData[0];
+    pkkout << endl;
+    
+  }
+
+  if (tcPtr->activeCreat == Fragrecord::AC_NR_COPY)
+  {
+    jam();
+    Uint32 op = tcPtr->operation;
+    switch(errCode){
+    case ZNO_TUPLE_FOUND:
+      ndbrequire(op == ZDELETE);
       break;
-    case ZINSERT:
-      jam();
-      if (errCode != ZTUPLE_ALREADY_EXIST) {
-        jam();
-/* ------------------------------------------------------------------------- */
-/*A NORMAL ERROR WILL BE TREATED AS A NORMAL ABORT AND WILL ABORT THE        */
-/*TRANSACTION. NO SPECIAL HANDLING IS NEEDED.                                */
-/* ------------------------------------------------------------------------- */
-        tcPtr->activeCreat = ZFALSE;
-      }//if
       break;
     default:
-      jam();
-/* ------------------------------------------------------------------------- */
-/*A NORMAL ERROR WILL BE TREATED AS A NORMAL ABORT AND WILL ABORT THE        */
-/*TRANSACTION. NO SPECIAL HANDLING IS NEEDED.                                */
-/* ------------------------------------------------------------------------- */
-      tcPtr->activeCreat = ZFALSE;
-      break;
-    }//switch
-  } else {
+      ndbrequire(false);
+    }
+    tcPtr->activeCreat = Fragrecord::AC_IGNORED;
+  }
+  else 
+  {
+    ndbrequire(!LqhKeyReq::getNrCopyFlag(tcPtr->reqinfo));
+    
     /**
      * Only primary replica can get ZTUPLE_ALREADY_EXIST || ZNO_TUPLE_FOUND
      *
@@ -6209,11 +6558,16 @@
      *
      * -> ZNO_TUPLE_FOUND is possible
      */
+    ndbrequire(tcPtr->operation == ZREAD 
+	       || tcPtr->operation == ZREAD_EX
+	       || tcPtr->seqNoReplica == 0);
+    
     ndbrequire
       (tcPtr->seqNoReplica == 0 ||
        errCode != ZTUPLE_ALREADY_EXIST ||
        (tcPtr->operation == ZREAD && (tcPtr->dirtyOp || tcPtr->opSimple)));
   }
+  
   tcPtr->abortState = TcConnectionrec::ABORT_FROM_LQH;
   abortCommonLab(signal);
   return;
@@ -6226,7 +6580,7 @@
     jam();
     return;
   }//if
-  regTcPtr->activeCreat = ZFALSE;
+  regTcPtr->activeCreat = Fragrecord::AC_NORMAL;
   regTcPtr->abortState = TcConnectionrec::ABORT_FROM_LQH;
   regTcPtr->errorCode = terrorCode;
   abortStateHandlerLab(signal);
@@ -6406,7 +6760,7 @@
    *       ACTIVE CREATION IS RESET FOR ALL ERRORS WHICH SHOULD BE HANDLED 
    *       WITH NORMAL ABORT HANDLING.
    * ----------------------------------------------------------------------- */
-  regTcPtr->activeCreat = ZFALSE;
+  regTcPtr->activeCreat = Fragrecord::AC_NORMAL;
   abortCommonLab(signal);
   return;
 }//Dblqh::abortErrorLab()
@@ -6415,7 +6769,8 @@
 {
   TcConnectionrec * const regTcPtr = tcConnectptr.p;
   const Uint32 commitAckMarker = regTcPtr->commitAckMarker;
-  if(regTcPtr->activeCreat != ZTRUE && commitAckMarker != RNIL){
+  if(regTcPtr->activeCreat != Fragrecord::AC_IGNORED && 
+     commitAckMarker != RNIL){
     /**
      * There is no NR ongoing and we have a marker
      */
@@ -6480,42 +6835,12 @@
   TcConnectionrec * const regTcPtr = tcConnectptr.p;
   fragptr.i = regTcPtr->fragmentptr;
   c_fragment_pool.getPtr(fragptr);
-  if ((cCommitBlocked == true) &&
-      (fragptr.p->fragActiveStatus == ZTRUE) &&
-      (canBlock == true) &&
-      (regTcPtr->operation != ZREAD)) {
-    jam();
-/* ------------------------------------------------------------------------- */
-// TUP and/or ACC have problems in writing the undo log to disk fast enough.
-// We must avoid the abort at this time and try later instead. The fragment
-// is also active with a local checkpoint and this commit can generate UNDO
-// log records that overflow the UNDO log buffer.
-//
-// In certain situations it is simply too complex to insert a wait state here
-// since ACC is active and we cannot release the operation from the active
-// list without causing great complexity.
-/* ------------------------------------------------------------------------- */
-/*---------------------------------------------------------------------------*/
-// We must delay the write of abort info to the log to safe-guard against
-// a crash due to lack of log pages. We temporary stop all log writes to this
-// log part to ensure that we don't get a buffer explosion in the delayed
-// signal buffer instead.
-/*---------------------------------------------------------------------------*/
-    logPartPtr.i = regTcPtr->m_log_part_ptr_i;
-    ptrCheckGuard(logPartPtr, clogPartFileSize, logPartRecord);
-    linkWaitLog(signal, logPartPtr);
-    regTcPtr->transactionState = TcConnectionrec::ABORT_QUEUED;
-    if (logPartPtr.p->logPartState == LogPartRecord::IDLE) {
-      jam();
-      logPartPtr.p->logPartState = LogPartRecord::ACTIVE;
-    }//if
-    return;
-  }//if
   signal->theData[0] = regTcPtr->tupConnectrec;
   EXECUTE_DIRECT(DBTUP, GSN_TUP_ABORTREQ, signal, 1);
   regTcPtr->transactionState = TcConnectionrec::WAIT_ACC_ABORT;
   signal->theData[0] = regTcPtr->accConnectrec;
-  EXECUTE_DIRECT(DBACC, GSN_ACC_ABORTREQ, signal, 1);
+  signal->theData[1] = true;
+  EXECUTE_DIRECT(DBACC, GSN_ACC_ABORTREQ, signal, 2);
   /* ------------------------------------------------------------------------
    * We need to insert a real-time break by sending ACC_ABORTCONF through the
    * job buffer to ensure that we catch any ACCKEYCONF or TUPKEYCONF or
@@ -6536,11 +6861,11 @@
   ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
   TcConnectionrec * const regTcPtr = tcConnectptr.p;
   ndbrequire(regTcPtr->transactionState == TcConnectionrec::WAIT_ACC_ABORT);
-  if (regTcPtr->activeCreat == ZTRUE) {
+  if (regTcPtr->activeCreat == Fragrecord::AC_IGNORED) {
     /* ----------------------------------------------------------------------
      * A NORMAL EVENT DURING CREATION OF A FRAGMENT. WE NOW NEED TO CONTINUE
      * WITH NORMAL COMMIT PROCESSING.
-     * ---------------------------------------------------------------------- */
+     * --------------------------------------------------------------------- */
     if (regTcPtr->currTupAiLen == regTcPtr->totReclenAi) {
       jam();
       regTcPtr->abortState = TcConnectionrec::ABORT_IDLE;
@@ -6856,6 +7181,7 @@
 	       * THE RECEIVER OF THE COPY HAVE FAILED. 
 	       * WE HAVE TO CLOSE THE COPY PROCESS. 
 	       * ----------------------------------------------------------- */
+	      ndbout_c("close copy");
               tcConnectptr.p->tcNodeFailrec = tcNodeFailptr.i;
               tcConnectptr.p->abortState = TcConnectionrec::NEW_FROM_TC;
               closeCopyRequestLab(signal);
@@ -9249,7 +9575,7 @@
   const Uint32 copyPtr = copyFragReq->userPtr;
   const Uint32 userRef = copyFragReq->userRef;
   const Uint32 nodeId = copyFragReq->nodeId;
-  const Uint32 gci = 0; //copyFragReq->gci = cnewestGci;
+  const Uint32 gci = copyFragReq->gci;
   
   ndbrequire(cnoActiveCopy < 3);
   ndbrequire(getFragmentrec(signal, fragId));
@@ -9424,6 +9750,19 @@
 
 void Dblqh::continueFirstCopyAfterBlockedLab(Signal* signal) 
 {
+  /**
+   * Start sending ROWID for all operations from now on
+   */
+  fragptr.p->m_copy_started_state = Fragrecord::AC_NR_COPY;
+
+  if (0)
+  {
+    ndbout_c("STOPPING COPY (%d -> %d %d %d)",
+	     scanptr.p->scanBlockref,
+	     scanptr.p->scanAccPtr, RNIL, NextScanReq::ZSCAN_NEXT);
+    return;
+  }
+  
   scanptr.i = tcConnectptr.p->tcScanRec;
   c_scanRecordPool.getPtr(scanptr);
   signal->theData[0] = scanptr.p->scanAccPtr;
@@ -9471,32 +9810,87 @@
     return;
   }//if
 
-  // If accOperationPtr == RNIL no record was returned by ACC
-  if (nextScanConf->accOperationPtr == RNIL) {
-    jam();
-    signal->theData[0] = scanptr.p->scanAccPtr;
-    signal->theData[1] = AccCheckScan::ZCHECK_LCP_STOP;
-    sendSignal(scanptr.p->scanBlockref, GSN_ACC_CHECK_SCAN, signal, 2, JBB);
-    return;      
-  }
-
-  set_acc_ptr_in_scan_record(scanptr.p, 0, nextScanConf->accOperationPtr);
-  initCopyTc(signal);
-  tcConnectptr.p->m_use_rowid = true;
-  tcConnectptr.p->m_row_id = scanptr.p->m_row_id;
+  TcConnectionrec * tcConP = tcConnectptr.p;
   
-  Fragrecord* fragPtrP= fragptr.p;
-  scanptr.p->scanState = ScanRecord::WAIT_TUPKEY_COPY;
-  tcConnectptr.p->transactionState = TcConnectionrec::COPY_TUPKEY;
-  if(tcConnectptr.p->m_disk_table)
+  tcConP->m_use_rowid = true;
+  tcConP->m_row_id = scanptr.p->m_row_id;
+  
+  if (signal->getLength() == 7)
   {
-    next_scanconf_load_diskpage(signal, scanptr, tcConnectptr,fragPtrP);
+    jam();
+    ndbrequire(nextScanConf->accOperationPtr == RNIL);
+    initCopyTc(signal, ZDELETE);
+    set_acc_ptr_in_scan_record(scanptr.p, 0, RNIL);
+    tcConP->gci = nextScanConf->gci;
+
+    tcConP->primKeyLen = 0;
+    tcConP->totSendlenAi = 0;
+    tcConP->connectState = TcConnectionrec::COPY_CONNECTED;
+
+/*---------------------------------------------------------------------------*/
+// To avoid using up to many operation records in ACC we will increase the
+// constant to ensure that we never send more than 40 records at a time.
+// This is where the constant 56 comes from. For long records this constant
+// will not matter that much. The current maximum is 6000 words outstanding
+// (including a number of those 56 words not really sent). We also have to
+// ensure that there are never more simultaneous usage of these operation
+// records to ensure that node recovery does not fail because of simultaneous
+// scanning.
+/*---------------------------------------------------------------------------*/
+    UintR TnoOfWords = 8;
+    TnoOfWords = TnoOfWords + MAGIC_CONSTANT;
+    TnoOfWords = TnoOfWords + (TnoOfWords >> 2);
+    
+    /*-----------------------------------------------------------------
+     * NOTE for transid1!
+     * Transid1 in the tcConnection record is used load regulate the 
+     * copy(node recovery) process.
+     * The number of outstanding words are written in the transid1 
+     * variable. This will be sent to the starting node in the 
+     * LQHKEYREQ signal and when the answer is returned in the LQHKEYCONF
+     * we can reduce the number of outstanding words and check to see
+     * if more LQHKEYREQ signals should be sent.
+     * 
+     * However efficient this method is rather unsafe in such way that
+     * it overwrites the transid1 original data.
+     *
+     * Also see TR 587.
+     *----------------------------------------------------------------*/
+    tcConP->transid[0] = TnoOfWords; // Data overload, see note!
+    packLqhkeyreqLab(signal);
+    tcConP->copyCountWords += TnoOfWords;
+    scanptr.p->scanState = ScanRecord::WAIT_LQHKEY_COPY;
+    if (tcConP->copyCountWords < cmaxWordsAtNodeRec) {
+      nextRecordCopy(signal);
+    }
+    return;
   }
   else
   {
-    next_scanconf_tupkeyreq(signal, scanptr, tcConnectptr.p, fragPtrP, RNIL);
+    // If accOperationPtr == RNIL no record was returned by ACC
+    if (nextScanConf->accOperationPtr == RNIL) {
+      jam();
+      signal->theData[0] = scanptr.p->scanAccPtr;
+      signal->theData[1] = AccCheckScan::ZCHECK_LCP_STOP;
+      sendSignal(scanptr.p->scanBlockref, GSN_ACC_CHECK_SCAN, signal, 2, JBB);
+      return;      
+    }
+    
+    initCopyTc(signal, ZINSERT);
+    set_acc_ptr_in_scan_record(scanptr.p, 0, nextScanConf->accOperationPtr);
+    
+    Fragrecord* fragPtrP= fragptr.p;
+    scanptr.p->scanState = ScanRecord::WAIT_TUPKEY_COPY;
+    tcConP->transactionState = TcConnectionrec::COPY_TUPKEY;
+    if(tcConP->m_disk_table)
+    {
+      next_scanconf_load_diskpage(signal, scanptr, tcConnectptr,fragPtrP);
+    }
+    else
+    {
+      next_scanconf_tupkeyreq(signal, scanptr, tcConP, fragPtrP, RNIL);
+    }
   }
-  return;
 }//Dblqh::nextScanConfCopyLab()
 
 
@@ -9739,10 +10133,23 @@
   c_scanRecordPool.getPtr(scanptr);
   tcConnectptr.p->errorCode = 0;
   Uint32 acc_op_ptr= get_acc_ptr_from_scan_record(scanptr.p, 0, false);
-  signal->theData[0] = scanptr.p->scanAccPtr;
-  signal->theData[1] = acc_op_ptr;
-  signal->theData[2] = NextScanReq::ZSCAN_NEXT_COMMIT;
-  sendSignal(scanptr.p->scanBlockref, GSN_NEXT_SCANREQ, signal, 3, JBB);
+  if (acc_op_ptr != RNIL)
+  {
+    signal->theData[0] = scanptr.p->scanAccPtr;
+    signal->theData[1] = acc_op_ptr;
+    signal->theData[2] = NextScanReq::ZSCAN_NEXT_COMMIT;
+    sendSignal(scanptr.p->scanBlockref, GSN_NEXT_SCANREQ, signal, 3, JBB);
+  }
+  else
+  {
+    /**
+     * No need to commit (unlock)
+     */
+    signal->theData[0] = scanptr.p->scanAccPtr;
+    signal->theData[1] = RNIL;
+    signal->theData[2] = NextScanReq::ZSCAN_NEXT;
+    sendSignal(scanptr.p->scanBlockref, GSN_NEXT_SCANREQ, signal, 3, JBB);
+  }
   return;
 }//Dblqh::continueCopyAfterBlockedLab()
 
@@ -9773,6 +10180,12 @@
   tcConnectptr.p->transid[1] = 0;
   fragptr.i = tcConnectptr.p->fragmentptr;
   c_fragment_pool.getPtr(fragptr);
+
+  /**
+   * Stop sending ROWID for all operations from now on
+   */
+  fragptr.p->m_copy_started_state = Fragrecord::AC_NORMAL;
+  
   scanptr.i = tcConnectptr.p->tcScanRec;
   c_scanRecordPool.getPtr(scanptr);
   scanptr.p->scanState = ScanRecord::WAIT_CLOSE_COPY;
@@ -9812,7 +10225,7 @@
   c_scanRecordPool.getPtr(scanptr);
   signal->theData[0] = scanptr.p->scanAccPtr;
   signal->theData[1] = RNIL;
-  signal->theData[2] = ZCOPY_CLOSE;
+  signal->theData[2] = NextScanReq::ZSCAN_CLOSE;
   sendSignal(scanptr.p->scanBlockref, GSN_NEXT_SCANREQ, signal, 3, JBB);
   return;
 }//Dblqh::continueCloseCopyAfterBlockedLab()
@@ -9898,7 +10311,7 @@
       conf->tableId = tcConnectptr.p->tableref;
       conf->fragId = tcConnectptr.p->fragmentid;
       sendSignal(tcConnectptr.p->clientBlockref, GSN_COPY_FRAGCONF, signal,
-                 CopyFragConf::SignalLength, JBB);
+		 CopyFragConf::SignalLength, JBB);
     }//if
   }//if
   releaseActiveCopy(signal);
@@ -9917,6 +10330,7 @@
 void Dblqh::closeCopyRequestLab(Signal* signal) 
 {
   scanptr.p->scanErrorCounter++;
+  ndbout_c("closeCopyRequestLab: scanState: %d", scanptr.p->scanState);
   switch (scanptr.p->scanState) {
   case ScanRecord::WAIT_TUPKEY_COPY:
   case ScanRecord::WAIT_NEXT_SCAN_COPY:
@@ -10147,7 +10561,7 @@
 /*                                                                           */
 /*       SUBROUTINE SHORT NAME = ICT                                         */
 /* ========================================================================= */
-void Dblqh::initCopyTc(Signal* signal) 
+void Dblqh::initCopyTc(Signal* signal, Operation_t op) 
 {
   tcConnectptr.p->operation = ZREAD;
   tcConnectptr.p->apiVersionNo = 0;
@@ -10156,8 +10570,9 @@
   Uint32 reqinfo = 0;
   LqhKeyReq::setDirtyFlag(reqinfo, 1);
   LqhKeyReq::setSimpleFlag(reqinfo, 1);
-  LqhKeyReq::setOperation(reqinfo, ZINSERT);
+  LqhKeyReq::setOperation(reqinfo, op);
   LqhKeyReq::setGCIFlag(reqinfo, 1);
+  LqhKeyReq::setNrCopyFlag(reqinfo, 1);
                                         /* AILen in LQHKEYREQ  IS ZERO */
   tcConnectptr.p->reqinfo = reqinfo;
 /* ------------------------------------------------------------------------ */
@@ -10477,7 +10892,6 @@
    * ----------------------------------------------------------------------- */
   fragptr.i = lcpPtr.p->currentFragment.fragPtrI;
   c_fragment_pool.getPtr(fragptr);
-  fragptr.p->fragActiveStatus = ZFALSE;
   
   contChkpNextFragLab(signal);
   return;
@@ -10931,7 +11345,29 @@
   const Uint32 dihBlockRef = saveReq->dihBlockRef;
   const Uint32 dihPtr = saveReq->dihPtr;
   const Uint32 gci = saveReq->gci;
-  
+
+  if(getNodeState().startLevel >= NodeState::SL_STOPPING_4){
+    GCPSaveRef * const saveRef = (GCPSaveRef*)&signal->theData[0];
+    saveRef->dihPtr = dihPtr;
+    saveRef->nodeId = getOwnNodeId();
+    saveRef->gci    = gci;
+    saveRef->errorCode = GCPSaveRef::NodeShutdownInProgress;
+    sendSignal(dihBlockRef, GSN_GCP_SAVEREF, signal, 
+	       GCPSaveRef::SignalLength, JBB);
+    return;
+  }
+
+  if(getNodeState().getNodeRestartInProgress()){
+    GCPSaveRef * const saveRef = (GCPSaveRef*)&signal->theData[0];
+    saveRef->dihPtr = dihPtr;
+    saveRef->nodeId = getOwnNodeId();
+    saveRef->gci    = gci;
+    saveRef->errorCode = GCPSaveRef::NodeRestartInProgress;
+    sendSignal(dihBlockRef, GSN_GCP_SAVEREF, signal, 
+	       GCPSaveRef::SignalLength, JBB);
+    return;
+  }
+
   ndbrequire(gci >= cnewestCompletedGci);
   
   if (gci == cnewestCompletedGci) {
@@ -10967,30 +11403,7 @@
   }//if
   
   ndbrequire(ccurrentGcprec == RNIL);
-  
-  
-  if(getNodeState().startLevel >= NodeState::SL_STOPPING_4){
-    GCPSaveRef * const saveRef = (GCPSaveRef*)&signal->theData[0];
-    saveRef->dihPtr = dihPtr;
-    saveRef->nodeId = getOwnNodeId();
-    saveRef->gci    = gci;
-    saveRef->errorCode = GCPSaveRef::NodeShutdownInProgress;
-    sendSignal(dihBlockRef, GSN_GCP_SAVEREF, signal, 
-	       GCPSaveRef::SignalLength, JBB);
-    return;
-  }
-
-  if(getNodeState().getNodeRestartInProgress()){
-    GCPSaveRef * const saveRef = (GCPSaveRef*)&signal->theData[0];
-    saveRef->dihPtr = dihPtr;
-    saveRef->nodeId = getOwnNodeId();
-    saveRef->gci    = gci;
-    saveRef->errorCode = GCPSaveRef::NodeRestartInProgress;
-    sendSignal(dihBlockRef, GSN_GCP_SAVEREF, signal, 
-	       GCPSaveRef::SignalLength, JBB);
-    return;
-  }
-  
+    
   ccurrentGcprec = 0;
   gcpPtr.i = ccurrentGcprec;
   ptrCheckGuard(gcpPtr, cgcprecFileSize, gcpRecord);
@@ -13075,13 +13488,6 @@
    *   WE ALSO NEED TO SET CNEWEST_GCI TO ENSURE THAT LOG RECORDS ARE EXECUTED
    *   WITH A PROPER GCI.
    *------------------------------------------------------------------------ */
-  if (cstartType == NodeState::ST_NODE_RESTART) {
-    jam();
-    signal->theData[0] = ZSR_PHASE3_START;
-    signal->theData[1] = ZSR_PHASE2_COMPLETED;
-    sendSignal(cownref, GSN_CONTINUEB, signal, 2, JBB);
-    return;
-  }//if
   if(cstartType == NodeState::ST_INITIAL_NODE_RESTART){
     jam();
     StartRecConf * conf = (StartRecConf*)signal->getDataPtrSend();
@@ -13229,16 +13635,9 @@
      *    WE NEED TO SEND THOSE SIGNALS EVEN IF WE HAVE NOT REQUESTED 
      *    ANY FRAGMENTS PARTICIPATE IN THIS PHASE.
      * --------------------------------------------------------------------- */
-    for (Uint32 i = 0; i < cnoOfNodes; i++) {
-      jam();
-      if (cnodeStatus[i] == ZNODE_UP) {
-        jam();
-        ndbrequire(cnodeData[i] < MAX_NDB_NODES);
-        BlockReference ref = calcLqhBlockRef(cnodeData[i]);
-        signal->theData[0] = cownNodeid;
-        sendSignal(ref, GSN_EXEC_SRREQ, signal, 1, JBB);
-      }//if
-    }//for
+    NodeReceiverGroup rg(DBLQH, m_sr_nodes);
+    signal->theData[0] = cownNodeid;
+    sendSignal(rg, GSN_EXEC_SRREQ, signal, 1, JBB);
     return;
   } else {
     jam();
@@ -13279,8 +13678,15 @@
       c_lcp_complete_fragments.remove(fragptr);
       c_redo_complete_fragments.add(fragptr);
       
-      fragptr.p->fragStatus = Fragrecord::FSACTIVE;
-      fragptr.p->logFlag = Fragrecord::STATE_TRUE;
+      if (!getNodeState().getNodeRestartInProgress())
+      {
+	fragptr.p->logFlag = Fragrecord::STATE_TRUE;
+	fragptr.p->fragStatus = Fragrecord::FSACTIVE;
+      } 
+      else
+      {	
+	fragptr.p->fragStatus = Fragrecord::ACTIVE_CREATION;	
+      }
       signal->theData[0] = fragptr.p->srUserptr;
       signal->theData[1] = cownNodeid;
       sendSignal(fragptr.p->srBlockref, GSN_START_FRAGCONF, signal, 2, JBB);
@@ -13378,32 +13784,22 @@
   jamEntry();
   Uint32 nodeId = signal->theData[0];
   arrGuard(nodeId, MAX_NDB_NODES);
-  cnodeExecSrState[nodeId] = ZEXEC_SR_COMPLETED;
-  ndbrequire(cnoOfNodes < MAX_NDB_NODES);
-  for (Uint32 i = 0; i < cnoOfNodes; i++) {
+  m_sr_exec_sr_conf.set(nodeId);
+  if (!m_sr_nodes.equal(m_sr_exec_sr_conf))
+  {
     jam();
-    if (cnodeStatus[i] == ZNODE_UP) {
-      jam();
-      nodeId = cnodeData[i];
-      arrGuard(nodeId, MAX_NDB_NODES);
-      if (cnodeExecSrState[nodeId] != ZEXEC_SR_COMPLETED) {
-        jam();
-	/* ------------------------------------------------------------------
-	 *  ALL NODES HAVE NOT REPORTED COMPLETION OF EXECUTING FRAGMENT 
-	 *  LOGS YET.
-	 * ----------------------------------------------------------------- */
-        return;
-      }//if
-    }//if
-  }//for
-
+    /* ------------------------------------------------------------------
+     *  ALL NODES HAVE NOT REPORTED COMPLETION OF EXECUTING FRAGMENT 
+     *  LOGS YET.
+     * ----------------------------------------------------------------- */
+    return;
+  }
+  
   /* ------------------------------------------------------------------------
    *  CLEAR NODE SYSTEM RESTART EXECUTION STATE TO PREPARE FOR NEXT PHASE OF
    *  LOG EXECUTION.
    * ----------------------------------------------------------------------- */
-  for (nodeId = 0; nodeId < MAX_NDB_NODES; nodeId++) {
-    cnodeExecSrState[nodeId] = ZSTART_SR;
-  }//for
+  m_sr_exec_sr_conf.clear();
 
   /* ------------------------------------------------------------------------
    *  NOW CHECK IF ALL FRAGMENTS IN THIS PHASE HAVE COMPLETED. IF SO START THE
@@ -13487,29 +13883,19 @@
   jamEntry();
   Uint32 nodeId = signal->theData[0];
   ndbrequire(nodeId < MAX_NDB_NODES);
-  cnodeSrState[nodeId] = ZEXEC_SR_COMPLETED;
-  ndbrequire(cnoOfNodes < MAX_NDB_NODES);
-  for (Uint32 i = 0; i < cnoOfNodes; i++) {
+  m_sr_exec_sr_req.set(nodeId);
+  if (!m_sr_exec_sr_req.equal(m_sr_nodes))
+  {
     jam();
-    if (cnodeStatus[i] == ZNODE_UP) {
-      jam();
-      nodeId = cnodeData[i];
-      if (cnodeSrState[nodeId] != ZEXEC_SR_COMPLETED) {
-        jam();
-	/* ------------------------------------------------------------------
-	 *  ALL NODES HAVE NOT REPORTED COMPLETION OF SENDING EXEC_FRAGREQ YET.
-	 * ----------------------------------------------------------------- */
-        return;
-      }//if
-    }//if
-  }//for
+    return;
+  }
+
   /* ------------------------------------------------------------------------
    *  CLEAR NODE SYSTEM RESTART STATE TO PREPARE FOR NEXT PHASE OF LOG 
    *  EXECUTION
    * ----------------------------------------------------------------------- */
-  for (nodeId = 0; nodeId < MAX_NDB_NODES; nodeId++) {
-    cnodeSrState[nodeId] = ZSTART_SR;
-  }//for
+  m_sr_exec_sr_req.clear();
+
   if (csrPhasesCompleted != 0) {
     /* ----------------------------------------------------------------------
      *       THE FIRST PHASE MUST ALWAYS EXECUTE THE LOG.
@@ -13517,7 +13903,7 @@
     if (cnoFragmentsExecSr == 0) {
       jam();
       /* --------------------------------------------------------------------
-       *   THERE WERE NO FRAGMENTS THAT NEEDED TO EXECUTE THE LOG IN THIS PHASE.
+       *  THERE WERE NO FRAGMENTS THAT NEEDED TO EXECUTE THE LOG IN THIS PHASE.
        * ------------------------------------------------------------------- */
       srPhase3Comp(signal);
       return;
@@ -13561,11 +13947,6 @@
   if (csrPhaseStarted == ZSR_NO_PHASE_STARTED) {
     jam();
     csrPhaseStarted = tsrPhaseStarted;
-    if (cstartType == NodeState::ST_NODE_RESTART) {
-      ndbrequire(cinitialStartOngoing == ZTRUE);
-      cinitialStartOngoing = ZFALSE;
-      checkStartCompletedLab(signal);
-    }//if
     return;
   }//if  
   ndbrequire(csrPhaseStarted != tsrPhaseStarted);
@@ -13588,22 +13969,12 @@
       logPartPtr.p->logLastGci = 2;
     }//if
   }//for
-  if (cstartType == NodeState::ST_NODE_RESTART) {
-    jam();
-    /* ----------------------------------------------------------------------
-     *  FOR A NODE RESTART WE HAVE NO FRAGMENTS DEFINED YET. 
-     *  THUS WE CAN SKIP THAT PART
-     * --------------------------------------------------------------------- */
-    signal->theData[0] = ZSR_GCI_LIMITS;
-    signal->theData[1] = RNIL;
-    sendSignal(cownref, GSN_CONTINUEB, signal, 2, JBB);
-  } else {
-    jam();
-    c_lcp_complete_fragments.first(fragptr);
-    signal->theData[0] = ZSR_GCI_LIMITS;
-    signal->theData[1] = fragptr.i;
-    sendSignal(cownref, GSN_CONTINUEB, signal, 2, JBB);
-  }//if
+  
+  jam();
+  c_lcp_complete_fragments.first(fragptr);
+  signal->theData[0] = ZSR_GCI_LIMITS;
+  signal->theData[1] = fragptr.i;
+  sendSignal(cownref, GSN_CONTINUEB, signal, 2, JBB);
   return;
 }//Dblqh::srPhase3Start()
 
@@ -14497,9 +14868,6 @@
     if (unlikely(terrorCode != ZTUPLE_ALREADY_EXIST && terrorCode != 899))
       goto error;
     
-    if (terrorCode == 899)
-      ndbout_c("899");
-    
     break;
   default:
     goto error;
@@ -14583,24 +14951,11 @@
    *   ALL LOG PARTS HAVE COMPLETED THE EXECUTION OF THE LOG. WE CAN NOW START
    *   SENDING THE EXEC_FRAGCONF SIGNALS TO ALL INVOLVED FRAGMENTS.
    * ----------------------------------------------------------------------- */
-  if (cstartType != NodeState::ST_NODE_RESTART) {
-    jam();
-    c_lcp_complete_fragments.first(fragptr);
-    signal->theData[0] = ZSEND_EXEC_CONF;
-    signal->theData[1] = fragptr.i;
-    sendSignal(cownref, GSN_CONTINUEB, signal, 2, JBB);
-  } else {
-    jam();
-    /* ----------------------------------------------------------------------
-     *  FOR NODE RESTART WE CAN SKIP A NUMBER OF STEPS SINCE WE HAVE NO 
-     *  FRAGMENTS DEFINED AT THIS POINT. OBVIOUSLY WE WILL NOT NEED TO 
-     *  EXECUTE ANY MORE LOG STEPS EITHER AND THUS WE CAN IMMEDIATELY 
-     *  START FINDING THE END AND THE START OF THE LOG.
-     * --------------------------------------------------------------------- */
-    csrPhasesCompleted = 3;
-    execSrCompletedLab(signal);
-    return;
-  }//if
+  jam();
+  c_lcp_complete_fragments.first(fragptr);
+  signal->theData[0] = ZSEND_EXEC_CONF;
+  signal->theData[1] = fragptr.i;
+  sendSignal(cownref, GSN_CONTINUEB, signal, 2, JBB);
   return;
 }//Dblqh::execLogComp()
 
@@ -14989,7 +15344,7 @@
 /*       YET. THIS IS POSSIBLE IF ACTIVE CREATION OF THE FRAGMENT IS       */
 /*       ONGOING.                                                          */
 /*************************************************************************>*/
-    if (tcConnectptr.p->activeCreat == ZTRUE) {
+    if (tcConnectptr.p->activeCreat == Fragrecord::AC_IGNORED) {
         jam();
 /*************************************************************************>*/
 /*       ONGOING ABORTS DURING ACTIVE CREATION MUST SAVE THE ATTRIBUTE INFO*/
@@ -15034,7 +15389,7 @@
 /*       COMPLETED AND THAT THE ERROR CODE IS PROPERLY SET                 */
 /*************************************************************************>*/
         tcConnectptr.p->errorCode = terrorCode;
-        tcConnectptr.p->activeCreat = ZFALSE;
+        tcConnectptr.p->activeCreat = Fragrecord::AC_NORMAL;
         if (tcConnectptr.p->transactionState == 
 	    TcConnectionrec::WAIT_AI_AFTER_ABORT) {
           jam();
@@ -15644,7 +15999,6 @@
     refresh_watch_dog();
     new (fragptr.p) Fragrecord();
     fragptr.p->fragStatus = Fragrecord::FREE;
-    fragptr.p->fragActiveStatus = ZFALSE;
     fragptr.p->execSrStatus = Fragrecord::IDLE;
     fragptr.p->srStatus = Fragrecord::SS_IDLE;
   }
@@ -15803,10 +16157,9 @@
   switch (data) {
   case 0:
     jam();
-    for (i = 0; i < MAX_NDB_NODES; i++) {
-      cnodeSrState[i] = ZSTART_SR;
-      cnodeExecSrState[i] = ZSTART_SR;
-    }//for
+    m_sr_nodes.clear();
+    m_sr_exec_sr_req.clear();
+    m_sr_exec_sr_conf.clear();
     for (i = 0; i < 1024; i++) {
       ctransidHash[i] = RNIL;
     }//for
@@ -15814,16 +16167,11 @@
       cactiveCopy[i] = RNIL;
     }//for
     cnoActiveCopy = 0;
-    cCounterAccCommitBlocked = 0;
-    cCounterTupCommitBlocked = 0;
-    caccCommitBlocked = false;
-    ctupCommitBlocked = false;
-    cCommitBlocked = false;
     ccurrentGcprec = RNIL;
     caddNodeState = ZFALSE;
     cstartRecReq = ZFALSE;
-    cnewestGci = (UintR)-1;
-    cnewestCompletedGci = (UintR)-1;
+    cnewestGci = ~0;
+    cnewestCompletedGci = ~0;
     crestartOldestGci = 0;
     crestartNewestGci = 0;
     csrPhaseStarted = ZSR_NO_PHASE_STARTED;
@@ -16180,6 +16528,7 @@
 /*       SET SIMPLE TRANSACTION                                              */
 /* ------------------------------------------------------------------------- */
   LqhKeyReq::setSimpleFlag(Treqinfo, 1);
+  LqhKeyReq::setGCIFlag(Treqinfo, 1);
 /* ------------------------------------------------------------------------- */
 /* SET OPERATION TYPE AND LOCK MODE (NEVER READ OPERATION OR SCAN IN LOG)    */
 /* ------------------------------------------------------------------------- */

--- 1.35/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp	2005-12-06 09:24:55 +01:00
+++ 1.36/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp	2005-12-21 16:41:52 +01:00
@@ -401,9 +401,7 @@
       Next = 5,                 // looking for next extry
       Last = 6,                 // after last entry
       Aborting = 7,             // lock wait at scan close
-      Invalid = 9,              // cannot return REF to LQH currently
-
-      CurrentDeleted = 10       // Found deleted ROWID (used for NR)
+      Invalid = 9               // cannot return REF to LQH currently
     };
     Uint16 m_state;
 
@@ -1186,8 +1184,6 @@
     STATIC_CONST( MM_GROWN    = 0x00400000 ); // Has MM part grown
     STATIC_CONST( FREE        = 0x00800000 ); // On free list of page
     STATIC_CONST( LCP_SKIP    = 0x01000000 ); // Should not be returned in LCP
-    STATIC_CONST( LCP_KEEP    = 0x02000000 ); // Is deleted but kept in LCP
-    STATIC_CONST( NR_LOCK     = 0x04000000 ); // Locked for NR copy
     
     Uint32 get_tuple_version() const { 
       return m_header_bits & TUP_VERSION_MASK;
@@ -1385,8 +1381,13 @@
   int load_diskpage_scan(Signal*, Uint32 opRec, Uint32 fragPtrI, 
 			 Uint32 local_key, Uint32 flags);
 
+  int alloc_page(Tablerec*, Fragrecord*, PagePtr*,Uint32 page_no);
+  
   void start_restore_lcp(Uint32 tableId, Uint32 fragmentId);
   void complete_restore_lcp(Uint32 tableId, Uint32 fragmentId);
+
+  int nr_read_pk(Uint32 fragPtr, const Local_key*, Uint32* dataOut, bool&copy);
+  int nr_update_gci(Uint32 fragPtr, const Local_key*, Uint32 gci);
 private:
   BLOCK_DEFINES(Dbtup);
 

--- 1.8/storage/ndb/src/kernel/blocks/dbtup/DbtupAbort.cpp	2005-11-25 20:12:39 +01:00
+++ 1.9/storage/ndb/src/kernel/blocks/dbtup/DbtupAbort.cpp	2005-12-21 16:41:52 +01:00
@@ -132,10 +132,11 @@
       disk_page_abort_prealloc(signal, regFragPtr.p, &key, key.m_page_idx);
     }
     
-    Uint32 bits= copy->m_header_bits;
+    Uint32 bits= tuple_ptr->m_header_bits;
+    Uint32 copy_bits= copy->m_header_bits;
     if(! (bits & Tuple_header::ALLOC))
     {
-      if(bits & Tuple_header::MM_GROWN)
+      if(copy_bits & Tuple_header::MM_GROWN)
       {
 	ndbout_c("abort grow");
 	Ptr<Page> vpage;
@@ -162,6 +163,16 @@
       {
 	ndbout_c("abort shrink");
       }
+    }
+    else if (regOperPtr.p->is_first_operation() && 
+	     regOperPtr.p->is_last_operation())
+    {
+      /**
+       * Aborting last operation that performed ALLOC
+       */
+      ndbout_c("clearing ALLOC");
+      tuple_ptr->m_header_bits &= ~(Uint32)Tuple_header::ALLOC;
+      tuple_ptr->m_header_bits |= Tuple_header::FREE;
     }
   }
   

--- 1.9/storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp	2005-12-05 16:54:20 \
                +01:00
+++ 1.10/storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp	2005-12-21 16:41:52 \
+01:00 @@ -51,6 +51,10 @@
     
     PagePtr pagePtr;
     Tuple_header* ptr= (Tuple_header*)get_ptr(&pagePtr, &tmp, regTabPtr.p);
+
+    Uint32 *disk_ref_ptr = ptr->get_disk_ref_ptr(regTabPtr.p); 
+    signal->theData[0] = disk_ref_ptr[0];
+    signal->theData[1] = disk_ref_ptr[1];
     
     if (regTabPtr.p->m_attributes[MM].m_no_of_varsize)
     {

--- 1.32/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp	2005-12-05 10:38:32 \
                +01:00
+++ 1.33/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp	2005-12-21 16:41:52 \
+01:00 @@ -2883,3 +2883,130 @@
   req_struct->m_tuple_ptr->m_header_bits = copy_bits;
   return 0;
 }
+
+int
+Dbtup::nr_update_gci(Uint32 fragPtrI, const Local_key* key, Uint32 gci)
+{
+  FragrecordPtr fragPtr;
+  fragPtr.i= fragPtrI;
+  ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
+  TablerecPtr tablePtr;
+  tablePtr.i= fragPtr.p->fragTableId;
+  ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
+
+  if (tablePtr.p->m_bits & Tablerec::TR_RowGCI)
+  {
+    Local_key tmp = *key;
+    PagePtr page_ptr;
+    if (alloc_page(tablePtr.p, fragPtr.p, &page_ptr, tmp.m_page_no))
+      return -1;
+    
+    Tuple_header* ptr = (Tuple_header*)
+      ((Fix_page*)page_ptr.p)->get_ptr(tmp.m_page_idx, 0);
+    
+    ndbrequire(ptr->m_header_bits & Tuple_header::FREE);
+    *ptr->get_mm_gci(tablePtr.p) = gci;
+  }
+  return 0;
+}
+
+int
+Dbtup::nr_read_pk(Uint32 fragPtrI, 
+		  const Local_key* key, Uint32* dst, bool& copy)
+{
+  
+  FragrecordPtr fragPtr;
+  fragPtr.i= fragPtrI;
+  ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
+  TablerecPtr tablePtr;
+  tablePtr.i= fragPtr.p->fragTableId;
+  ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
+
+  Local_key tmp = *key;
+  Uint32 pages = fragPtr.p->noOfPages;
+  
+  PagePtr page_ptr;
+  if (alloc_page(tablePtr.p, fragPtr.p, &page_ptr, tmp.m_page_no))
+    return -1;
+
+  KeyReqStruct req_struct;
+  Uint32* ptr= ((Fix_page*)page_ptr.p)->get_ptr(key->m_page_idx, 0);
+  
+  req_struct.m_page_ptr = page_ptr;
+  req_struct.m_tuple_ptr = (Tuple_header*)ptr;
+  Uint32 bits = req_struct.m_tuple_ptr->m_header_bits;
+
+  int ret = 0;
+  copy = false;
+  if (! (bits & Tuple_header::FREE))
+  {
+    if (bits & Tuple_header::ALLOC)
+    {
+      Uint32 opPtrI= req_struct.m_tuple_ptr->m_operation_ptr_i;
+      Operationrec* opPtrP= c_operation_pool.getPtr(opPtrI);
+      ndbassert(!opPtrP->m_copy_tuple_location.isNull());
+      req_struct.m_tuple_ptr= (Tuple_header*)
+	c_undo_buffer.get_ptr(&opPtrP->m_copy_tuple_location);
+      copy = true;
+    }
+    req_struct.check_offset[MM]= tablePtr.p->get_check_offset(MM);
+    req_struct.check_offset[DD]= tablePtr.p->get_check_offset(DD);
+    
+    Uint32 num_attr= tablePtr.p->m_no_of_attributes;
+    Uint32 descr_start= tablePtr.p->tabDescriptor;
+    TableDescriptor *tab_descr= &tableDescriptor[descr_start];
+    ndbrequire(descr_start + (num_attr << ZAD_LOG_SIZE) <= cnoOfTabDescrRec);
+    req_struct.attr_descr= tab_descr; 
+
+    if (tablePtr.p->need_expand())
+      prepare_read(&req_struct, tablePtr.p, false);
+    
+    const Uint32* attrIds= &tableDescriptor[tablePtr.p->readKeyArray].tabDescr;
+    const Uint32 numAttrs= tablePtr.p->noOfKeyAttr;
+    // read pk attributes from original tuple
+    
+    // new globals
+    tabptr= tablePtr;
+    fragptr= fragPtr;
+    operPtr.i= RNIL;
+    operPtr.p= NULL;
+    
+    // do it
+    ret = readAttributes(&req_struct,
+			 attrIds,
+			 numAttrs,
+			 dst,
+			 ZNIL, false);
+    
+    // done
+    if (likely(ret != -1)) {
+      // remove headers
+      Uint32 n= 0;
+      Uint32 i= 0;
+      while (n < numAttrs) {
+	const AttributeHeader ah(dst[i]);
+	Uint32 size= ah.getDataSize();
+	ndbrequire(size != 0);
+	for (Uint32 j= 0; j < size; j++) {
+	  dst[i + j - n]= dst[i + j + 1];
+	}
+	n+= 1;
+	i+= 1 + size;
+      }
+      ndbrequire((int)i == ret);
+      ret -= numAttrs;
+    } else {
+      return terrorCode ? (-(int)terrorCode) : -1;
+    }
+  }
+    
+  if (tablePtr.p->m_bits & Tablerec::TR_RowGCI)
+  {
+    dst[ret] = *req_struct.m_tuple_ptr->get_mm_gci(tablePtr.p);
+  }
+  else
+  {
+    dst[ret] = 0;
+  }
+  return ret;
+}

--- 1.7/storage/ndb/src/kernel/blocks/dbtup/DbtupFixAlloc.cpp	2005-12-06 09:24:56 \
                +01:00
+++ 1.8/storage/ndb/src/kernel/blocks/dbtup/DbtupFixAlloc.cpp	2005-12-21 16:41:52 \
+01:00 @@ -146,7 +146,7 @@
   while (pos + nextTuple <= Fix_page::DATA_WORDS)
   {
     regPagePtr->m_data[pos] = (prev << 16) | (pos + nextTuple);
-    regPagePtr->m_data[pos + 1] = Tuple_header::FREE;
+    regPagePtr->m_data[pos + 1] = Fix_page::FREE_RECORD;
     regPagePtr->m_data[pos + gci_pos] = gci_val;
     prev = pos;
     pos += nextTuple;
@@ -204,58 +204,76 @@
   } 
 }//Dbtup::freeTh()
 
-Uint32*
-Dbtup::alloc_fix_rowid(Fragrecord* const regFragPtr,
-		       Tablerec* const regTabPtr,
-		       Local_key* key,
-		       Uint32 * out_frag_page_id) 
+
+int
+Dbtup::alloc_page(Tablerec* tabPtrP, Fragrecord* fragPtrP, 
+		  PagePtr * ret, Uint32 page_no)
 {
-  Uint32 page_no = key->m_page_no;
-  Uint32 pages = regFragPtr->noOfPages;
-  Uint32 idx= key->m_page_idx;
+  Uint32 pages = fragPtrP->noOfPages;
   
   if (page_no >= pages)
   {
     Uint32 start = pages;
     while(page_no >= pages)
       pages += (pages >> 3) + (pages >> 4) + 2;
-    allocFragPages(regFragPtr, pages - start);
-    if (page_no >= (pages = regFragPtr->noOfPages))
+    allocFragPages(fragPtrP, pages - start);
+    if (page_no >= (pages = fragPtrP->noOfPages))
     {
       terrorCode = ZMEM_NOMEM_ERROR;
-      return 0;
+      return 1;
     }
   }
   
   PagePtr pagePtr;
-  pagePtr.i = getRealpid(regFragPtr, page_no);
-  c_page_pool.getPtr(pagePtr);
+  c_page_pool.getPtr(pagePtr, getRealpid(fragPtrP, page_no));
   
-  Uint32 state = pagePtr.p->page_state;
-  LocalDLList<Page> free_pages(c_page_pool, regFragPtr->thFreeFirst);
-  LocalDLList<Page> alloc_pages(c_page_pool, regFragPtr->emptyPrimPage);
-  switch(state){
-  case ZEMPTY_MM:
-    convertThPage((Fix_page*)pagePtr.p, regTabPtr, MM);
-    
+  LocalDLList<Page> alloc_pages(c_page_pool, fragPtrP->emptyPrimPage);
+  LocalDLList<Page> free_pages(c_page_pool, fragPtrP->thFreeFirst);
+  if (pagePtr.p->page_state == ZEMPTY_MM)
+  {
+    convertThPage((Fix_page*)pagePtr.p, tabPtrP, MM);
     pagePtr.p->page_state = ZTH_MM_FREE;
     alloc_pages.remove(pagePtr);
     free_pages.add(pagePtr);
-    // continue
+  }
+  
+  *ret = pagePtr;
+  return 0;
+}
+
+Uint32*
+Dbtup::alloc_fix_rowid(Fragrecord* const regFragPtr,
+		       Tablerec* const regTabPtr,
+		       Local_key* key,
+		       Uint32 * out_frag_page_id) 
+{
+  Uint32 page_no = key->m_page_no;
+  Uint32 idx= key->m_page_idx;
+  
+  PagePtr pagePtr;
+  if (alloc_page(regTabPtr, regFragPtr, &pagePtr, page_no))
+  {
+    terrorCode = ZMEM_NOMEM_ERROR;
+    return 0;
+  }
+
+  Uint32 state = pagePtr.p->page_state;
+  LocalDLList<Page> free_pages(c_page_pool, regFragPtr->thFreeFirst);
+  switch(state){
   case ZTH_MM_FREE:
     if (((Fix_page*)pagePtr.p)->alloc_record(idx) != idx)
     {
       terrorCode = ZROWID_ALLOCATED;
       return 0;
     }
-
+    
     if(pagePtr.p->free_space == 0)
     {
       jam();
       pagePtr.p->page_state = ZTH_MM_FULL;
       free_pages.remove(pagePtr);
     }
-
+    
     *out_frag_page_id= page_no;
     key->m_page_no = pagePtr.i;
     key->m_page_idx = idx;
@@ -263,5 +281,7 @@
   case ZTH_MM_FULL:
     terrorCode = ZROWID_ALLOCATED;
     return 0;
+  case ZEMPTY_MM:
+    ndbrequire(false);
   }
 }

--- 1.12/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp	2005-12-06 09:24:56 +01:00
+++ 1.13/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp	2005-12-21 16:41:52 +01:00
@@ -280,7 +280,7 @@
       const ScanPos& pos = scan.m_scanPos;
       const Local_key& key_mm = pos.m_key_mm;
       int ret = tuxReadPk(fragPtr.i, pos.m_realpid_mm, key_mm.m_page_idx,
-          pkData, false);
+			  pkData, false);
       ndbrequire(ret > 0);
       pkSize = ret;
       dbg((DBTUP, "PK size=%d data=%08x", pkSize, pkData[0]));
@@ -353,10 +353,7 @@
       scan.m_state = ScanOp::Locked;
     }
   } 
-  else if (scan.m_state == ScanOp::CurrentDeleted)
-  {
-    
-  }
+
   if (scan.m_state == ScanOp::Locked) {
     // we have lock or do not need one
     jam();
@@ -478,8 +475,20 @@
     // scan position should already have been moved (assert only)
     if (scan.m_state == ScanOp::Blocked) {
       jam();
-      ndbassert(false);
-      scan.m_state = ScanOp::Next;
+      //ndbassert(false);
+      if (scan.m_bits & ScanOp::SCAN_NR)
+      {
+	jam();
+	scan.m_state = ScanOp::Next;
+	scan.m_scanPos.m_get = ScanPos::Get_tuple;
+	ndbout_c("Ignoring scan.m_state == ScanOp::Blocked, refetch");
+      }
+      else
+      {
+	jam();
+	scan.m_state = ScanOp::Next;
+	ndbout_c("Ignoring scan.m_state == ScanOp::Blocked");
+      }
     }
     // LQH has the ball
     return;
@@ -757,8 +766,6 @@
 	      }
 	      else
 	      {
-		ndbout << "found deleted tuple" << pos.m_key 
-		       << " " << foundGCI << endl;
 		goto found_deleted_rowid;
 	      }
 	    }
@@ -766,9 +773,6 @@
 	    {
 	      jam();
 	      // skip free tuple
-	      ndbout << "Skipping " << pos.m_key << " gci: " 
-		     << *th->get_mm_gci(tablePtr.p)
-		     << " scanGCI: " << scanGCI << endl;
 	    }
 	  }
         } else {
@@ -812,31 +816,38 @@
         Fix_page* page = (Fix_page*)pos.m_page;
 	if (! (bits & ScanOp::SCAN_DD)) {
 	  key_mm = pos.m_key;
-	  ndbrequire(page->alloc_record(key.m_page_idx)==key.m_page_idx);
-	  th->m_header_bits |= Tuple_header::FREE; // Not foundable...
-	
-        // caller has already set pos.m_get to next tuple
+	  // caller has already set pos.m_get to next tuple
 	  // real page id is already set
 	} else {
 	  key_mm.assref(th->m_base_record_ref);
 	  // recompute for each disk tuple
 	  pos.m_realpid_mm = getRealpid(fragPtr.p, key_mm.m_page_no);
-
+	  
 	  Fix_page *mmpage = (Fix_page*)c_page_pool.getPtr(pos.m_realpid_mm);
 	  th = (Tuple_header*)(mmpage->m_data + key_mm.m_page_idx);
 	  if ((foundGCI = *th->get_mm_gci(tablePtr.p)) > scanGCI)
 	  {
 	    if (! (th->m_header_bits & Tuple_header::FREE))
 	      break;
-	    
-	    ndbrequire(mmpage->alloc_record(key_mm.m_page_idx) ==
-		       key_mm.m_page_idx);
-	    th->m_header_bits |= Tuple_header::FREE; // Not foundable...
 	  }
 	}
+	
+	NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
+	conf->scanPtr = scan.m_userPtr;
+	conf->accOperationPtr = RNIL;
+	conf->fragId = frag.fragmentId;
+	conf->localKey[0] = pos.m_key_mm.ref();
+	conf->localKey[1] = 0;
+	conf->localKeyLength = 1;
+	conf->gci = foundGCI;
+	Uint32 blockNo = refToBlock(scan.m_userRef);
+	EXECUTE_DIRECT(blockNo, GSN_NEXT_SCANCONF, signal, 7);
+	jamEntry();
+
 	// TUPKEYREQ handles savepoint stuff
-	scan.m_state = ScanOp::CurrentDeleted;
-	return true;
+	loop_count = 32;
+	scan.m_state = ScanOp::Next;
+	return false;
       }
       break; // incr loop count
     default:

-- 
MySQL Internals Mailing List
For list archives: http://lists.mysql.com/internals
To unsubscribe:    http://lists.mysql.com/internals?unsub=mysql-internals@progressive-comp.com



[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic