[prev in list] [next in list] [prev in thread] [next in thread] 

List:       monetdb-checkins
Subject:    [Monetdb-checkins] MonetDB5/src/modules/mal/crackers crackers.mx, ,
From:       stratos <idreos () users ! sourceforge ! net>
Date:       2009-12-29 15:54:46
Message-ID: E1NPePG-0000vE-L2 () sfp-cvsdas-1 ! v30 ! ch3 ! sourceforge ! com
[Download RAW message or body]

Update of /cvsroot/monetdb/MonetDB5/src/modules/mal/crackers
In directory sfp-cvsdas-1.v30.ch3.sourceforge.com:/tmp/cvs-serv3260

Modified Files:
	crackers.mx crackers_sortmerge.mx 
Log Message:
towards a more external variation of sort



Index: crackers.mx
===================================================================
RCS file: /cvsroot/monetdb/MonetDB5/src/modules/mal/crackers/crackers.mx,v
retrieving revision 1.26
retrieving revision 1.27
diff -u -d -r1.26 -r1.27
--- crackers.mx	28 Dec 2009 08:24:40 -0000	1.26
+++ crackers.mx	29 Dec 2009 15:54:44 -0000	1.27
@@ -241,6 +241,10 @@
 command extsort(b:bat[:oid,:@2], mode:int):bat[:oid,:@2]
 address CRKPartitionedSort_@2
 comment "Slice and sort";
+
+command extsort2(b:bat[:oid,:@2], sliceSize:int):bat[:oid,:@2]
+address CRKExternalSort_@2
+comment "Slice and external sort";
 @
 
 @= JoinSelect

Index: crackers_sortmerge.mx
===================================================================
RCS file: /cvsroot/monetdb/MonetDB5/src/modules/mal/crackers/crackers_sortmerge.mx,v
retrieving revision 1.10
retrieving revision 1.11
diff -u -d -r1.10 -r1.11
--- crackers_sortmerge.mx	28 Dec 2009 08:24:40 -0000	1.10
+++ crackers_sortmerge.mx	29 Dec 2009 15:54:44 -0000	1.11
@@ -49,6 +49,7 @@
 @= SelectFunctions_decl
 crackers_export str CRKselectSortMerge_@1(int *vid, int *bid, @1 *low, @1 *hgh, bit \
*inclusiveLow, bit *inclusiveHgh, int *mode, bit *rounding, int *sliceSize);  \
crackers_export str CRKPartitionedSort_@1(int *vid, int *bid, int *mode); \
+crackers_export str CRKExternalSort_@1(int *vid, int *bid, int *psliceSize);  @
 
 
@@ -109,6 +110,123 @@
 	}
 @
 
+@= MergeExtSort
+
+	QNodesSize=0;
+	current=head;
+	while(current!=NULL){
+		current->start=0;
+		current->tuples=BATcount(current->slice);
+		current->merged=current->tuples;
+		QNodes[QNodesSize++]=current;
+		current=current->next;
+	}
+
+{
+	PQ_state pq;
+	PQ_Index heapsize = QNodesSize;
+	PQ_Key const range = 1<<26;
+	size_t size = PQ_RequiredSpace (heapsize, 0);
+	char * space = malloc (size);
+	int resIndex;
+	int deferred;
+	oid *sh;
+	@1 *st;
+	oid mergeTuples;
+
+	rh = (oid*)Hloc(br,BUNfirst(br));
+	rt = (@1 *)Tloc(br,BUNfirst(br));
+
+	/* We use the value "as is" as PQ_Key for PQ_push();
+	 * PQ_Key is defined as unsigned int;
+	 * hence, we can (for now) only allow <= 4 byte integer types, here.
+	 *
+	 * TODO:
+	 * Either fix the implementation to indeed (correctly) support all types,
+	 * or omit the PQMerge inplementation for all non-supported types.
+	 */
+#if SIZEOF_WRD == SIZEOF_INT
+#define MAX_4_BYTE_INT_TYPE TYPE_wrd
+#else
+#if SIZEOF_OID == SIZEOF_INT
+#define MAX_4_BYTE_INT_TYPE TYPE_oid
+#else
+#define MAX_4_BYTE_INT_TYPE TYPE_int
+#endif
+#endif
+	if (TYPE_@1 <= TYPE_void || TYPE_@1 > MAX_4_BYTE_INT_TYPE)
+		throw(MAL, "crackers.sortmerge", "Type not supported by PQMerge");
+
+	PQ_PriorityQueue (& pq, "merge", space, size, heapsize, range, 0, 0, NULL, NULL);
+	
+	/*QNodes is an array of pointers to a simple struct 
+	that contains info on the slices with qualifying values*/
+
+	/*put all first values in the queue*/
+	for(j=0;j<QNodesSize;j++){
+		st=(@1  *)Tloc(QNodes[j]->slice,BUNfirst(QNodes[j]->slice)+QNodes[j]->start); 
+		PQ_push (& pq, j, (PQ_Key)*st, 0, NULL);
+	}
+
+	/*pop the smallest and push the next one from the same slice*/
+	mergeTuples=totalTuples-QNodesSize;
+	for(i=0;i<mergeTuples;i++){
+		resIndex = PQ_pop (& pq, & deferred, NULL);
+		sh=(oid *)Hloc(QNodes[resIndex]->slice,BUNfirst(QNodes[resIndex]->slice)+QNodes[resIndex]->start);
 +		st=(@1  *)Tloc(QNodes[resIndex]->slice,BUNfirst(QNodes[resIndex]->slice)+QNodes[resIndex]->start);
 +		
+		*rh=*sh;
+		*rt=*st;
+		rh++;rt++;
+		QNodes[resIndex]->start++;
+		QNodes[resIndex]->tuples--;
+		
+		if (QNodes[resIndex]->tuples>0){
+			st++;	
+			PQ_push (& pq, resIndex, (PQ_Key)*st, 0, NULL);
+		}else {
+			BAT *fullSlice;
+			BBPunfix(QNodes[resIndex]->slice->batCacheid);
+			/*BBPreclaim(QNodes[resIndex]->slice);*/
+			if ((fullSlice = BATdescriptor(QNodes[resIndex]->batId)) == NULL)
+				throw(MAL, "crackers.sortmerge", "Cannot access descriptor");
+
+			if (QNodes[resIndex]->merged >= BATcount(fullSlice)){ 
+				i--;
+				QNodesSize--;
+			}else{
+				BAT *tempslice;
+				oid nextBatch=bucketSize>(BATcount(fullSlice)-QNodes[resIndex]->merged)?BATcount(fullSlice)-QNodes[resIndex]->merged:bucketSize;
 +				tempslice=BATslice(fullSlice,QNodes[resIndex]->merged+1,QNodes[resIndex]->merged+1+nextBatch);		
 +				QNodes[resIndex]->slice=BATcopy(tempslice,tempslice->htype, tempslice->ttype, \
TRUE); +				BBPunfix(tempslice->batCacheid);
+				QNodes[resIndex]->start=0;
+				QNodes[resIndex]->tuples=nextBatch;	
+				QNodes[resIndex]->merged+=nextBatch;
+
+				st=(@1  *)Tloc(QNodes[resIndex]->slice,BUNfirst(QNodes[resIndex]->slice)+QNodes[resIndex]->start);
 +				PQ_push (& pq, resIndex, (PQ_Key)*st, 0, NULL);
+			}		
+			BBPunfix(fullSlice->batCacheid);
+			/*BBPreclaim(fullSlice);*/
+		}	
+	}
+
+	/*pop all remaining values*/
+	for(j=0;j<QNodesSize;j++){
+		resIndex = PQ_pop (& pq, & deferred, NULL);
+		sh=(oid *)Hloc(QNodes[resIndex]->slice,BUNfirst(QNodes[resIndex]->slice)+QNodes[resIndex]->start);
 +		st=(@1  *)Tloc(QNodes[resIndex]->slice,BUNfirst(QNodes[resIndex]->slice)+QNodes[resIndex]->start);
 +		
+		*rh=*sh;
+		*rt=*st;
+		rh++;rt++;
+		QNodes[resIndex]->start++;
+		QNodes[resIndex]->tuples--;
+	}
+}
+@
+
 @= NwayMerge
 	if (!gotSlices){
 		/* Take the slices */
@@ -800,7 +918,6 @@
 			b->hdense = FALSE;
 			b->tdense = FALSE;
 			BATkey(BATmirror(b),FALSE);
-			BATmode(b,PERSISTENT);
 			b->batRestricted= BAT_WRITE;
 			t1=GDKusec();
 			BATmirror(BATorder(BATmirror(b)));
@@ -820,6 +937,7 @@
 				current=smNode;
 			}
 			
+			BATmode(b,PERSISTENT);
 			BBPkeepref(b->batCacheid);
 			BBPunfix(slice->batCacheid);
 
@@ -980,7 +1098,6 @@
 		printf("\n Missed case! \n");
 
 	printf("Merged "OIDFMT" tuples \n",totalmergedTuples);	
-	//view = BATslice(VIEWhead_(br, BAT_READ), resStart, resEnd);
 	if (*rounding){
 		BAT *temp=BATslice(br,resStart, resEnd);
 		temp->hsorted = FALSE;
@@ -1218,6 +1335,103 @@
 	return MAL_SUCCEED;
 @
 
+@= ExtSortBody
+	BAT *b,*bo,*br;
+	oid sliceSize,i;
+	struct sortMergeNode *current,*head=NULL;
+	@1  *rt;
+	oid *rh;
+	oid endSlice,startSlice;
+	oid totalTuples;
+	struct sortMergeNode *QNodes[1000000];
+	int QNodesSize=0;
+	int j;
+	oid tupleSize,memoryForMerging,bucketSize;
+	lng timeStart=GDKusec();
+	
+	
+	if ((bo = BATdescriptor(*bid)) == NULL)
+		throw(MAL, "crackers.sortmerge", "Cannot access descriptor");
+
+	/*slicing*/
+	sliceSize=*psliceSize;
+	tupleSize=12;
+	memoryForMerging=1024*1024*1024;
+	bucketSize= ((memoryForMerging/tupleSize)) / (BATcount(bo)/sliceSize);
+	bucketSize=bucketSize>sliceSize?sliceSize:bucketSize;
+	printf("use buckets of "OIDFMT" tuples\n ",bucketSize);
+
+	/*sort the slices*/
+	current=NULL;
+	startSlice=0;
+	endSlice=sliceSize>BATcount(bo)?BATcount(bo):sliceSize;
+
+	while (1){
+		struct sortMergeNode *smNode;
+		BAT *tempslice;
+		BAT *slice=BATslice(bo, startSlice, endSlice);
+		b = BATcopy(slice, bo->htype, bo->ttype, TRUE);
+		BBPunfix(slice->batCacheid);
+		if ( bo->htype == TYPE_void)
+			b = BATmaterializeh(b);
+		b->hsorted = FALSE;
+		b->tsorted = FALSE;
+		b->hdense = FALSE;
+		b->tdense = FALSE;
+		BATkey(BATmirror(b),FALSE);
+		b->batRestricted= BAT_WRITE;
+		BATmirror(BATorder(BATmirror(b)));
+
+		smNode=(struct sortMergeNode *)GDKmalloc(sizeof(struct sortMergeNode));
+		smNode->batId=b->batCacheid;
+		tempslice=BATslice(b,0,bucketSize);
+		smNode->slice=BATcopy(tempslice,tempslice->htype, tempslice->ttype, TRUE);
+		BBPunfix(tempslice->batCacheid);
+		smNode->next=NULL;
+		
+		if (head==NULL){
+			head=smNode;
+			current=head;
+		}
+		else{	
+			current->next = smNode;
+			current=smNode;
+		}
+		
+		/*BATsave(b);*/
+		BATmode(b,PERSISTENT);
+		BBPkeepref(b->batCacheid);
+		BBPreclaim(b);
+
+		if (endSlice==BATcount(bo))
+			break;
+		startSlice=endSlice;
+		endSlice=endSlice+sliceSize>BATcount(bo)?BATcount(bo):endSlice+sliceSize;
+	}
+	
+	br=BATnew(TYPE_oid, bo->ttype,BATcount(bo));
+	BATsetcount(br,BATcount(bo));
+	totalTuples=BATcount(br);
+	BBPunfix(bo->batCacheid);
+	
+	printf("sorting phase %lld \n",GDKusec()-timeStart);
+
+	@:MergeExtSort(@1,@2,@3,@4)@
+
+	br->hsorted = FALSE;
+	br->tsorted = TRUE;
+	br->hdense =  TRUE;
+	br->tdense =  TRUE;
+	BATkey(BATmirror(br),FALSE);
+	BATmode(br,PERSISTENT);
+	br->batRestricted= BAT_READ;
+	*vid = br->batCacheid;
+	BBPkeepref(*vid);
+
+	return MAL_SUCCEED;
+@
+
+
 @= SortMergeOperations
 static str
 SelectSortMergeBounds_@1(int *vid, int *bid, @1 *plow, @1 *phgh, bit *inclusiveLow, \
bit *inclusiveHgh, int *mode, bit *rounding, int *psliceSize){	 @@ -1229,6 +1443,10 \
@@  @:SortBody(@1,@2,@3,@4,)@
 }
 
+str
+CRKExternalSort_@1(int *vid, int *bid, int *psliceSize){	
+	@:ExtSortBody(@1,@2,@3,@4,)@
+}
 @
 
 @}


------------------------------------------------------------------------------
This SF.Net email is sponsored by the Verizon Developer Community
Take advantage of Verizon's best-in-class app development support
A streamlined, 14 day to market process makes app distribution fast and easy
Join now and get one step closer to millions of Verizon customers
http://p.sf.net/sfu/verizon-dev2dev 
_______________________________________________
Monetdb-checkins mailing list
Monetdb-checkins@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/monetdb-checkins


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic