[prev in list] [next in list] [prev in thread] [next in thread] 

List:       monetdb-checkins
Subject:    MonetDB: partition - Added single key partitioning
From:       Martin Kersten <commits+mk=cwi.nl () monetdb ! org>
Date:       2017-12-31 13:23:02
Message-ID: hg.94a6c138fe70.1514726582.6315528441665844383 () monetdb-vm0 ! spin-off ! cwi ! nl
[Download RAW message or body]

Changeset: 94a6c138fe70 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=94a6c138fe70
Modified Files:
	clients/Tests/MAL-signatures.stable.out
	clients/Tests/MAL-signatures.stable.out.int128
	clients/Tests/exports.stable.out
	monetdb5/modules/mal/Tests/hashpartition.malC
	monetdb5/modules/mal/Tests/hashpartition.stable.out
	monetdb5/modules/mal/partition.c
	monetdb5/modules/mal/partition.h
	monetdb5/modules/mal/partition.mal
Branch: partition
Log Message:

Added single key partitioning
An alternative is to cut out the partitions by a single hash value


diffs (truncated from 325 to 300 lines):

diff --git a/clients/Tests/MAL-signatures.stable.out \
                b/clients/Tests/MAL-signatures.stable.out
--- a/clients/Tests/MAL-signatures.stable.out
+++ b/clients/Tests/MAL-signatures.stable.out
@@ -8228,7 +8228,9 @@ Ready.
 [ "optimizer",	"wlc",	"pattern optimizer.wlc():str ",	"OPTwrapper;",	""	]
 [ "optimizer",	"wlc",	"pattern optimizer.wlc(mod:str, fcn:str):str \
",	"OPTwrapper;",	"Inject the workload capture-replay primitives."	]  [ \
"partition",	"hash",	"pattern partition.hash(b:bat[:any_2]):bat[:any_2]... \
",	"PARThash;",	"Perform a value based partition "	] +[ "partition",	"hash",	"pattern \
partition.hash(b:bat[:any_2], key:int, pieces:int):bat[:any_2] \
",	"PARThashkeyed;",	"Perform a value based partition "	]  [ \
"partition",	"slice",	"pattern partition.slice(b:bat[:any_2]):bat[:oid]... \
",	"PARTslice;",	"Perform a value based partition to produce candidate lists "	] +[ \
"partition",	"slice",	"pattern partition.slice(b:bat[:any_2], key:int, \
pieces:int):bat[:oid] ",	"PARTslicekeyed;",	"Perform a value based partition to \
produce candidate lists "	]  [ "pcre",	"imatch",	"command pcre.imatch(s:str, \
pat:str):bit ",	"PCREimatch;",	"Caseless Perl Compatible Regular Expression pattern \
matching against a string"	]  [ "pcre",	"index",	"command pcre.index(pat:pcre, \
s:str):int ",	"PCREindex;",	"match a pattern, return matched position (or 0 when not \
found)"	]  [ "pcre",	"match",	"command pcre.match(s:str, pat:str):bit \
",	"PCREmatch;",	"Perl Compatible Regular Expression pattern matching against a \
                string"	]
diff --git a/clients/Tests/MAL-signatures.stable.out.int128 \
                b/clients/Tests/MAL-signatures.stable.out.int128
--- a/clients/Tests/MAL-signatures.stable.out.int128
+++ b/clients/Tests/MAL-signatures.stable.out.int128
@@ -10590,7 +10590,9 @@ Ready.
 [ "optimizer",	"wlc",	"pattern optimizer.wlc():str ",	"OPTwrapper;",	""	]
 [ "optimizer",	"wlc",	"pattern optimizer.wlc(mod:str, fcn:str):str \
",	"OPTwrapper;",	"Inject the workload capture-replay primitives."	]  [ \
"partition",	"hash",	"pattern partition.hash(b:bat[:any_2]):bat[:any_2]... \
",	"PARThash;",	"Perform a value based partition "	] +[ "partition",	"hash",	"pattern \
partition.hash(b:bat[:any_2], key:int, pieces:int):bat[:any_2] \
",	"PARThashkeyed;",	"Perform a value based partition "	]  [ \
"partition",	"slice",	"pattern partition.slice(b:bat[:any_2]):bat[:oid]... \
",	"PARTslice;",	"Perform a value based partition to produce candidate lists "	] +[ \
"partition",	"slice",	"pattern partition.slice(b:bat[:any_2], key:int, \
pieces:int):bat[:oid] ",	"PARTslicekeyed;",	"Perform a value based partition to \
produce candidate lists "	]  [ "pcre",	"imatch",	"command pcre.imatch(s:str, \
pat:str):bit ",	"PCREimatch;",	"Caseless Perl Compatible Regular Expression pattern \
matching against a string"	]  [ "pcre",	"index",	"command pcre.index(pat:pcre, \
s:str):int ",	"PCREindex;",	"match a pattern, return matched position (or 0 when not \
found)"	]  [ "pcre",	"match",	"command pcre.match(s:str, pat:str):bit \
",	"PCREmatch;",	"Perl Compatible Regular Expression pattern matching against a \
                string"	]
diff --git a/clients/Tests/exports.stable.out b/clients/Tests/exports.stable.out
--- a/clients/Tests/exports.stable.out
+++ b/clients/Tests/exports.stable.out
@@ -1597,7 +1597,9 @@ str OPTvolcanoImplementation(Client cntx
 str OPTwlcImplementation(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci);
 str OPTwrapper(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr p);
 str PARThash(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr p);
+str PARThashkeyed(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr p);
 str PARTslice(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr p);
+str PARTslicekeyed(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr p);
 str PCREilike2(bit *ret, const str *s, const str *pat);
 str PCREilike3(bit *ret, const str *s, const str *pat, const str *esc);
 str PCREimatch(bit *ret, const str *val, const str *pat);
diff --git a/monetdb5/modules/mal/Tests/hashpartition.malC \
                b/monetdb5/modules/mal/Tests/hashpartition.malC
--- a/monetdb5/modules/mal/Tests/hashpartition.malC
+++ b/monetdb5/modules/mal/Tests/hashpartition.malC
@@ -42,6 +42,8 @@ io.print(c2);
 io.print(c3);
 io.print(c4);
 io.print(c5);
+k1 := partition.hash(b,0,5);
+io.print(k1);
 
 (o1,o2):= partition.slice(b);
 io.print(o1);
@@ -53,6 +55,8 @@ io.print(o2);
 io.print(o3);
 io.print(o4);
 io.print(o5);
+s1 := partition.slice(b,0,5);
+io.print(s1);
 
 end tst;
 
diff --git a/monetdb5/modules/mal/Tests/hashpartition.stable.out \
                b/monetdb5/modules/mal/Tests/hashpartition.stable.out
--- a/monetdb5/modules/mal/Tests/hashpartition.stable.out
+++ b/monetdb5/modules/mal/Tests/hashpartition.stable.out
@@ -150,6 +150,16 @@ Ready.
 [ 5@0,	29	]
 #--------------------------#
 # h	t  # name
+# void	int  # type
+#--------------------------#
+[ 0@0,	0	]
+[ 1@0,	5	]
+[ 2@0,	10	]
+[ 3@0,	15	]
+[ 4@0,	20	]
+[ 5@0,	25	]
+#--------------------------#
+# h	t  # name
 # void	oid  # type
 #--------------------------#
 [ 0@0,	0@0	]
@@ -236,6 +246,16 @@ Ready.
 [ 3@0,	19@0	]
 [ 4@0,	24@0	]
 [ 5@0,	29@0	]
+#--------------------------#
+# h	t  # name
+# void	oid  # type
+#--------------------------#
+[ 0@0,	0@0	]
+[ 1@0,	5@0	]
+[ 2@0,	10@0	]
+[ 3@0,	15@0	]
+[ 4@0,	20@0	]
+[ 5@0,	25@0	]
 
 # 14:09:25 >  
 # 14:09:25 >  "Done."
diff --git a/monetdb5/modules/mal/partition.c b/monetdb5/modules/mal/partition.c
--- a/monetdb5/modules/mal/partition.c
+++ b/monetdb5/modules/mal/partition.c
@@ -29,6 +29,19 @@
         }                           \
     } while (0)
 
+#define keyedhashpartition(TYPE)                         		\
+    do { p= BATcount(b);                               		\
+	 bi = bat_iterator(b);					\
+        for (r=0; r < p; r++) {                    		\
+            TYPE *v = (TYPE *) BUNtloc(bi, r);          	\
+	    TYPE c =  mix_##TYPE((*v));      			\
+	    if((c % pieces) == key && BUNappend(bn, (void*) v, FALSE) != GDK_SUCCEED){ \
+		    msg = createException(MAL,"partition.hash","Storage failed");\
+		    break;\
+	    }\
+        }                           \
+    } while (0)
+
 #define slicepartition(TYPE)                         		\
     do { p= BATcount(b);                               		\
 	 bi = bat_iterator(b);					\
@@ -42,10 +55,24 @@
         }                           \
     } while (0)
 
+#define keyedslicepartition(TYPE)                         		\
+    do { p= BATcount(b);                               		\
+	 bi = bat_iterator(b);					\
+        for (r=0; r < p; r++) {                    		\
+            TYPE *v = (TYPE *) BUNtloc(bi, r);          	\
+	    TYPE c =  mix_##TYPE((*v));      			\
+	    if( (c % pieces) == key && BUNappend(bn, (void*) &r, FALSE) != GDK_SUCCEED){ \
+		    msg = createException(MAL,"partition.slice","Storage failed");\
+		    break;\
+	    }\
+        }                           \
+    } while (0)
+
 str
 PARThash(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
 {
 	bat *ret = getArgReference_bat(stk,pci,0);
+	int bid = *getArgReference_bat(stk,pci,pci->retc);
 	int	i, pieces;
 	BAT *b, *bn[MAXPART];
 	BUN r, p; 
@@ -58,7 +85,7 @@ PARThash(Client cntxt, MalBlkPtr mb, Mal
 	pieces = pci->retc;
 	if ( pieces >= MAXPART)
 		throw(MAL,"partition.hash","too many partitions");
-	b = BATdescriptor( stk->stk[getArg(pci, pci->retc)].val.ival);
+	b = BATdescriptor( bid);
 	if ( b == NULL)
 		throw(MAL, "partition.hash", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
 
@@ -111,11 +138,73 @@ PARThash(Client cntxt, MalBlkPtr mb, Mal
 	BBPunfix(b->batCacheid);
 	return msg;
 }
+str
+PARThashkeyed(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+	bat *ret = getArgReference_bat(stk,pci,0);
+	int bid = *getArgReference_bat(stk,pci,1);
+	int key = *getArgReference_int(stk,pci,2);
+	int pieces = *getArgReference_int(stk,pci,3);
+	BAT *b, *bn;
+	BUN r, p; 
+	BATiter bi;
+	str msg = MAL_SUCCEED;
+
+
+	(void) cntxt;
+	(void) mb;
+	if ( pieces >= MAXPART || pieces < 1)
+		throw(MAL,"partition.slice","too many partitions");
+	b = BATdescriptor( bid);
+	if ( b == NULL)
+		throw(MAL, "partition.hash", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
+
+	size_t cap = BATcount(b) / pieces * 1.1;
+	bn = COLnew(0, b->ttype, cap, TRANSIENT);
+	if (bn == NULL){
+		BBPunfix(b->batCacheid);
+		throw(MAL, "partition.hash", SQLSTATE(HY001) MAL_MALLOC_FAIL);
+	}
+	if ( b->tvheap && bn->tvheap && ATOMstorage(b->ttype) != TYPE_str){
+		if (HEAPextend(bn->tvheap, b->tvheap->size, TRUE) != GDK_SUCCEED) {
+			BBPunfix(b->batCacheid);
+			BBPunfix(bn->batCacheid);
+			throw(MAL, "partition.hash", SQLSTATE(HY001) MAL_MALLOC_FAIL);
+		}
+	}
+	/* distribute the elements over multiple partitions */
+	switch(ATOMstorage(b->ttype)){
+		case TYPE_bte:
+			keyedhashpartition(bte);
+			break;
+		case TYPE_sht:
+			keyedhashpartition(sht);
+			break;
+		case TYPE_int:
+			keyedhashpartition(int);
+			break;
+		case TYPE_lng:
+			keyedhashpartition(lng);
+			break;
+#ifdef HAVE_HGE
+		case TYPE_hge:
+			keyedhashpartition(hge);
+			break;
+#endif
+		default:
+			msg = createException(MAL,"partition.hash","Non-supported type");
+	}
+
+	BBPkeepref(*ret = bn->batCacheid);
+	BBPunfix(b->batCacheid);
+	return msg;
+}
 
 str
 PARTslice(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
 {
 	bat *ret = getArgReference_bat(stk,pci,0);
+	int bid = *getArgReference_bat(stk,pci,pci->retc);
 	int	i, pieces;
 	BAT *b, *bn[MAXPART];
 	BUN r, p; 
@@ -128,7 +217,7 @@ PARTslice(Client cntxt, MalBlkPtr mb, Ma
 	pieces = pci->retc;
 	if ( pieces >= MAXPART)
 		throw(MAL,"partition.slice","too many partitions");
-	b = BATdescriptor( stk->stk[getArg(pci, pci->retc)].val.ival);
+	b = BATdescriptor( bid);
 	if ( b == NULL)
 		throw(MAL, "partition.slice", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
 
@@ -172,3 +261,57 @@ PARTslice(Client cntxt, MalBlkPtr mb, Ma
 	BBPunfix(b->batCacheid);
 	return msg;
 }
+
+str
+PARTslicekeyed(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+	bat *ret = getArgReference_bat(stk,pci,0);
+	int bid = *getArgReference_bat(stk,pci,1);
+	int key = *getArgReference_int(stk,pci,2);
+	int pieces = *getArgReference_int(stk,pci,3);
+	BAT *b, *bn;
+	BUN r, p; 
+	BATiter bi;
+	str msg = MAL_SUCCEED;
+
+	(void) cntxt;
+	(void) mb;
+	if ( pieces >= MAXPART || pieces < 1)
+		throw(MAL,"partition.slice","too many partitions");
+	b = BATdescriptor( bid);
+	if ( b == NULL)
+		throw(MAL, "partition.slice", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
+
+	size_t cap = BATcount(b) / pieces * 1.1;
+	bn = COLnew(0, TYPE_oid, cap, TRANSIENT);
+	if (bn == NULL){
+		BBPunfix(b->batCacheid);
+		throw(MAL, "partition.slice", SQLSTATE(HY001) MAL_MALLOC_FAIL);
+	}
+	/* distribute the elements over multiple partitions */
+	switch(ATOMstorage(b->ttype)){
+		case TYPE_bte:
+			keyedslicepartition(bte);
+			break;
+		case TYPE_sht:
+			keyedslicepartition(sht);
+			break;
+		case TYPE_int:
+			keyedslicepartition(int);
+			break;
+		case TYPE_lng:
+			keyedslicepartition(lng);
+			break;
+#ifdef HAVE_HGE
+		case TYPE_hge:
+			keyedslicepartition(hge);
+			break;
+#endif
+		default:
+			msg = createException(MAL,"partition.slice","Non-supported type");
+	}
+
+	BBPkeepref(*ret = bn->batCacheid);
+	BBPunfix(b->batCacheid);
+	return msg;
+}
diff --git a/monetdb5/modules/mal/partition.h b/monetdb5/modules/mal/partition.h
--- a/monetdb5/modules/mal/partition.h
+++ b/monetdb5/modules/mal/partition.h
@@ -14,6 +14,8 @@
 #include "mal_interpreter.h"
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic