[prev in list] [next in list] [prev in thread] [next in thread]
List: monetdb-checkins
Subject: [Monetdb-checkins] MonetDB5/src/modules/mal/crackers crackers.mx, ,
From: stratos <idreos () users ! sourceforge ! net>
Date: 2009-12-29 15:54:46
Message-ID: E1NPePG-0000vE-L2 () sfp-cvsdas-1 ! v30 ! ch3 ! sourceforge ! com
[Download RAW message or body]
Update of /cvsroot/monetdb/MonetDB5/src/modules/mal/crackers
In directory sfp-cvsdas-1.v30.ch3.sourceforge.com:/tmp/cvs-serv3260
Modified Files:
crackers.mx crackers_sortmerge.mx
Log Message:
towards a more external variation of sort
Index: crackers.mx
===================================================================
RCS file: /cvsroot/monetdb/MonetDB5/src/modules/mal/crackers/crackers.mx,v
retrieving revision 1.26
retrieving revision 1.27
diff -u -d -r1.26 -r1.27
--- crackers.mx 28 Dec 2009 08:24:40 -0000 1.26
+++ crackers.mx 29 Dec 2009 15:54:44 -0000 1.27
@@ -241,6 +241,10 @@
command extsort(b:bat[:oid,:@2], mode:int):bat[:oid,:@2]
address CRKPartitionedSort_@2
comment "Slice and sort";
+
+command extsort2(b:bat[:oid,:@2], sliceSize:int):bat[:oid,:@2]
+address CRKExternalSort_@2
+comment "Slice and external sort";
@
@= JoinSelect
Index: crackers_sortmerge.mx
===================================================================
RCS file: /cvsroot/monetdb/MonetDB5/src/modules/mal/crackers/crackers_sortmerge.mx,v
retrieving revision 1.10
retrieving revision 1.11
diff -u -d -r1.10 -r1.11
--- crackers_sortmerge.mx 28 Dec 2009 08:24:40 -0000 1.10
+++ crackers_sortmerge.mx 29 Dec 2009 15:54:44 -0000 1.11
@@ -49,6 +49,7 @@
@= SelectFunctions_decl
crackers_export str CRKselectSortMerge_@1(int *vid, int *bid, @1 *low, @1 *hgh, bit \
*inclusiveLow, bit *inclusiveHgh, int *mode, bit *rounding, int *sliceSize); \
crackers_export str CRKPartitionedSort_@1(int *vid, int *bid, int *mode); \
+crackers_export str CRKExternalSort_@1(int *vid, int *bid, int *psliceSize); @
@@ -109,6 +110,123 @@
}
@
+@= MergeExtSort
+
+ QNodesSize=0;
+ current=head;
+ while(current!=NULL){
+ current->start=0;
+ current->tuples=BATcount(current->slice);
+ current->merged=current->tuples;
+ QNodes[QNodesSize++]=current;
+ current=current->next;
+ }
+
+{
+ PQ_state pq;
+ PQ_Index heapsize = QNodesSize;
+ PQ_Key const range = 1<<26;
+ size_t size = PQ_RequiredSpace (heapsize, 0);
+ char * space = malloc (size);
+ int resIndex;
+ int deferred;
+ oid *sh;
+ @1 *st;
+ oid mergeTuples;
+
+ rh = (oid*)Hloc(br,BUNfirst(br));
+ rt = (@1 *)Tloc(br,BUNfirst(br));
+
+ /* We use the value "as is" as PQ_Key for PQ_push();
+ * PQ_Key is defined as unsigned int;
+ * hence, we can (for now) only allow <= 4 byte integer types, here.
+ *
+ * TODO:
+ * Either fix the implementation to indeed (correctly) support all types,
+ * or omit the PQMerge inplementation for all non-supported types.
+ */
+#if SIZEOF_WRD == SIZEOF_INT
+#define MAX_4_BYTE_INT_TYPE TYPE_wrd
+#else
+#if SIZEOF_OID == SIZEOF_INT
+#define MAX_4_BYTE_INT_TYPE TYPE_oid
+#else
+#define MAX_4_BYTE_INT_TYPE TYPE_int
+#endif
+#endif
+ if (TYPE_@1 <= TYPE_void || TYPE_@1 > MAX_4_BYTE_INT_TYPE)
+ throw(MAL, "crackers.sortmerge", "Type not supported by PQMerge");
+
+ PQ_PriorityQueue (& pq, "merge", space, size, heapsize, range, 0, 0, NULL, NULL);
+
+ /*QNodes is an array of pointers to a simple struct
+ that contains info on the slices with qualifying values*/
+
+ /*put all first values in the queue*/
+ for(j=0;j<QNodesSize;j++){
+ st=(@1 *)Tloc(QNodes[j]->slice,BUNfirst(QNodes[j]->slice)+QNodes[j]->start);
+ PQ_push (& pq, j, (PQ_Key)*st, 0, NULL);
+ }
+
+ /*pop the smallest and push the next one from the same slice*/
+ mergeTuples=totalTuples-QNodesSize;
+ for(i=0;i<mergeTuples;i++){
+ resIndex = PQ_pop (& pq, & deferred, NULL);
+ sh=(oid *)Hloc(QNodes[resIndex]->slice,BUNfirst(QNodes[resIndex]->slice)+QNodes[resIndex]->start);
+ st=(@1 *)Tloc(QNodes[resIndex]->slice,BUNfirst(QNodes[resIndex]->slice)+QNodes[resIndex]->start);
+
+ *rh=*sh;
+ *rt=*st;
+ rh++;rt++;
+ QNodes[resIndex]->start++;
+ QNodes[resIndex]->tuples--;
+
+ if (QNodes[resIndex]->tuples>0){
+ st++;
+ PQ_push (& pq, resIndex, (PQ_Key)*st, 0, NULL);
+ }else {
+ BAT *fullSlice;
+ BBPunfix(QNodes[resIndex]->slice->batCacheid);
+ /*BBPreclaim(QNodes[resIndex]->slice);*/
+ if ((fullSlice = BATdescriptor(QNodes[resIndex]->batId)) == NULL)
+ throw(MAL, "crackers.sortmerge", "Cannot access descriptor");
+
+ if (QNodes[resIndex]->merged >= BATcount(fullSlice)){
+ i--;
+ QNodesSize--;
+ }else{
+ BAT *tempslice;
+ oid nextBatch=bucketSize>(BATcount(fullSlice)-QNodes[resIndex]->merged)?BATcount(fullSlice)-QNodes[resIndex]->merged:bucketSize;
+ tempslice=BATslice(fullSlice,QNodes[resIndex]->merged+1,QNodes[resIndex]->merged+1+nextBatch);
+ QNodes[resIndex]->slice=BATcopy(tempslice,tempslice->htype, tempslice->ttype, \
TRUE); + BBPunfix(tempslice->batCacheid);
+ QNodes[resIndex]->start=0;
+ QNodes[resIndex]->tuples=nextBatch;
+ QNodes[resIndex]->merged+=nextBatch;
+
+ st=(@1 *)Tloc(QNodes[resIndex]->slice,BUNfirst(QNodes[resIndex]->slice)+QNodes[resIndex]->start);
+ PQ_push (& pq, resIndex, (PQ_Key)*st, 0, NULL);
+ }
+ BBPunfix(fullSlice->batCacheid);
+ /*BBPreclaim(fullSlice);*/
+ }
+ }
+
+ /*pop all remaining values*/
+ for(j=0;j<QNodesSize;j++){
+ resIndex = PQ_pop (& pq, & deferred, NULL);
+ sh=(oid *)Hloc(QNodes[resIndex]->slice,BUNfirst(QNodes[resIndex]->slice)+QNodes[resIndex]->start);
+ st=(@1 *)Tloc(QNodes[resIndex]->slice,BUNfirst(QNodes[resIndex]->slice)+QNodes[resIndex]->start);
+
+ *rh=*sh;
+ *rt=*st;
+ rh++;rt++;
+ QNodes[resIndex]->start++;
+ QNodes[resIndex]->tuples--;
+ }
+}
+@
+
@= NwayMerge
if (!gotSlices){
/* Take the slices */
@@ -800,7 +918,6 @@
b->hdense = FALSE;
b->tdense = FALSE;
BATkey(BATmirror(b),FALSE);
- BATmode(b,PERSISTENT);
b->batRestricted= BAT_WRITE;
t1=GDKusec();
BATmirror(BATorder(BATmirror(b)));
@@ -820,6 +937,7 @@
current=smNode;
}
+ BATmode(b,PERSISTENT);
BBPkeepref(b->batCacheid);
BBPunfix(slice->batCacheid);
@@ -980,7 +1098,6 @@
printf("\n Missed case! \n");
printf("Merged "OIDFMT" tuples \n",totalmergedTuples);
- //view = BATslice(VIEWhead_(br, BAT_READ), resStart, resEnd);
if (*rounding){
BAT *temp=BATslice(br,resStart, resEnd);
temp->hsorted = FALSE;
@@ -1218,6 +1335,103 @@
return MAL_SUCCEED;
@
+@= ExtSortBody
+ BAT *b,*bo,*br;
+ oid sliceSize,i;
+ struct sortMergeNode *current,*head=NULL;
+ @1 *rt;
+ oid *rh;
+ oid endSlice,startSlice;
+ oid totalTuples;
+ struct sortMergeNode *QNodes[1000000];
+ int QNodesSize=0;
+ int j;
+ oid tupleSize,memoryForMerging,bucketSize;
+ lng timeStart=GDKusec();
+
+
+ if ((bo = BATdescriptor(*bid)) == NULL)
+ throw(MAL, "crackers.sortmerge", "Cannot access descriptor");
+
+ /*slicing*/
+ sliceSize=*psliceSize;
+ tupleSize=12;
+ memoryForMerging=1024*1024*1024;
+ bucketSize= ((memoryForMerging/tupleSize)) / (BATcount(bo)/sliceSize);
+ bucketSize=bucketSize>sliceSize?sliceSize:bucketSize;
+ printf("use buckets of "OIDFMT" tuples\n ",bucketSize);
+
+ /*sort the slices*/
+ current=NULL;
+ startSlice=0;
+ endSlice=sliceSize>BATcount(bo)?BATcount(bo):sliceSize;
+
+ while (1){
+ struct sortMergeNode *smNode;
+ BAT *tempslice;
+ BAT *slice=BATslice(bo, startSlice, endSlice);
+ b = BATcopy(slice, bo->htype, bo->ttype, TRUE);
+ BBPunfix(slice->batCacheid);
+ if ( bo->htype == TYPE_void)
+ b = BATmaterializeh(b);
+ b->hsorted = FALSE;
+ b->tsorted = FALSE;
+ b->hdense = FALSE;
+ b->tdense = FALSE;
+ BATkey(BATmirror(b),FALSE);
+ b->batRestricted= BAT_WRITE;
+ BATmirror(BATorder(BATmirror(b)));
+
+ smNode=(struct sortMergeNode *)GDKmalloc(sizeof(struct sortMergeNode));
+ smNode->batId=b->batCacheid;
+ tempslice=BATslice(b,0,bucketSize);
+ smNode->slice=BATcopy(tempslice,tempslice->htype, tempslice->ttype, TRUE);
+ BBPunfix(tempslice->batCacheid);
+ smNode->next=NULL;
+
+ if (head==NULL){
+ head=smNode;
+ current=head;
+ }
+ else{
+ current->next = smNode;
+ current=smNode;
+ }
+
+ /*BATsave(b);*/
+ BATmode(b,PERSISTENT);
+ BBPkeepref(b->batCacheid);
+ BBPreclaim(b);
+
+ if (endSlice==BATcount(bo))
+ break;
+ startSlice=endSlice;
+ endSlice=endSlice+sliceSize>BATcount(bo)?BATcount(bo):endSlice+sliceSize;
+ }
+
+ br=BATnew(TYPE_oid, bo->ttype,BATcount(bo));
+ BATsetcount(br,BATcount(bo));
+ totalTuples=BATcount(br);
+ BBPunfix(bo->batCacheid);
+
+ printf("sorting phase %lld \n",GDKusec()-timeStart);
+
+ @:MergeExtSort(@1,@2,@3,@4)@
+
+ br->hsorted = FALSE;
+ br->tsorted = TRUE;
+ br->hdense = TRUE;
+ br->tdense = TRUE;
+ BATkey(BATmirror(br),FALSE);
+ BATmode(br,PERSISTENT);
+ br->batRestricted= BAT_READ;
+ *vid = br->batCacheid;
+ BBPkeepref(*vid);
+
+ return MAL_SUCCEED;
+@
+
+
@= SortMergeOperations
static str
SelectSortMergeBounds_@1(int *vid, int *bid, @1 *plow, @1 *phgh, bit *inclusiveLow, \
bit *inclusiveHgh, int *mode, bit *rounding, int *psliceSize){ @@ -1229,6 +1443,10 \
@@ @:SortBody(@1,@2,@3,@4,)@
}
+str
+CRKExternalSort_@1(int *vid, int *bid, int *psliceSize){
+ @:ExtSortBody(@1,@2,@3,@4,)@
+}
@
@}
------------------------------------------------------------------------------
This SF.Net email is sponsored by the Verizon Developer Community
Take advantage of Verizon's best-in-class app development support
A streamlined, 14 day to market process makes app distribution fast and easy
Join now and get one step closer to millions of Verizon customers
http://p.sf.net/sfu/verizon-dev2dev
_______________________________________________
Monetdb-checkins mailing list
Monetdb-checkins@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/monetdb-checkins
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic