[prev in list] [next in list] [prev in thread] [next in thread]
List: monetdb-checkins
Subject: MonetDB: partition - Added single key partitioning
From: Martin Kersten <commits+mk=cwi.nl () monetdb ! org>
Date: 2017-12-31 13:23:02
Message-ID: hg.94a6c138fe70.1514726582.6315528441665844383 () monetdb-vm0 ! spin-off ! cwi ! nl
[Download RAW message or body]
Changeset: 94a6c138fe70 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=94a6c138fe70
Modified Files:
clients/Tests/MAL-signatures.stable.out
clients/Tests/MAL-signatures.stable.out.int128
clients/Tests/exports.stable.out
monetdb5/modules/mal/Tests/hashpartition.malC
monetdb5/modules/mal/Tests/hashpartition.stable.out
monetdb5/modules/mal/partition.c
monetdb5/modules/mal/partition.h
monetdb5/modules/mal/partition.mal
Branch: partition
Log Message:
Added single key partitioning
An alternative is to cut out the partitions by a single hash value
diffs (truncated from 325 to 300 lines):
diff --git a/clients/Tests/MAL-signatures.stable.out \
b/clients/Tests/MAL-signatures.stable.out
--- a/clients/Tests/MAL-signatures.stable.out
+++ b/clients/Tests/MAL-signatures.stable.out
@@ -8228,7 +8228,9 @@ Ready.
[ "optimizer", "wlc", "pattern optimizer.wlc():str ", "OPTwrapper;", "" ]
[ "optimizer", "wlc", "pattern optimizer.wlc(mod:str, fcn:str):str \
", "OPTwrapper;", "Inject the workload capture-replay primitives." ] [ \
"partition", "hash", "pattern partition.hash(b:bat[:any_2]):bat[:any_2]... \
", "PARThash;", "Perform a value based partition " ] +[ "partition", "hash", "pattern \
partition.hash(b:bat[:any_2], key:int, pieces:int):bat[:any_2] \
", "PARThashkeyed;", "Perform a value based partition " ] [ \
"partition", "slice", "pattern partition.slice(b:bat[:any_2]):bat[:oid]... \
", "PARTslice;", "Perform a value based partition to produce candidate lists " ] +[ \
"partition", "slice", "pattern partition.slice(b:bat[:any_2], key:int, \
pieces:int):bat[:oid] ", "PARTslicekeyed;", "Perform a value based partition to \
produce candidate lists " ] [ "pcre", "imatch", "command pcre.imatch(s:str, \
pat:str):bit ", "PCREimatch;", "Caseless Perl Compatible Regular Expression pattern \
matching against a string" ] [ "pcre", "index", "command pcre.index(pat:pcre, \
s:str):int ", "PCREindex;", "match a pattern, return matched position (or 0 when not \
found)" ] [ "pcre", "match", "command pcre.match(s:str, pat:str):bit \
", "PCREmatch;", "Perl Compatible Regular Expression pattern matching against a \
string" ]
diff --git a/clients/Tests/MAL-signatures.stable.out.int128 \
b/clients/Tests/MAL-signatures.stable.out.int128
--- a/clients/Tests/MAL-signatures.stable.out.int128
+++ b/clients/Tests/MAL-signatures.stable.out.int128
@@ -10590,7 +10590,9 @@ Ready.
[ "optimizer", "wlc", "pattern optimizer.wlc():str ", "OPTwrapper;", "" ]
[ "optimizer", "wlc", "pattern optimizer.wlc(mod:str, fcn:str):str \
", "OPTwrapper;", "Inject the workload capture-replay primitives." ] [ \
"partition", "hash", "pattern partition.hash(b:bat[:any_2]):bat[:any_2]... \
", "PARThash;", "Perform a value based partition " ] +[ "partition", "hash", "pattern \
partition.hash(b:bat[:any_2], key:int, pieces:int):bat[:any_2] \
", "PARThashkeyed;", "Perform a value based partition " ] [ \
"partition", "slice", "pattern partition.slice(b:bat[:any_2]):bat[:oid]... \
", "PARTslice;", "Perform a value based partition to produce candidate lists " ] +[ \
"partition", "slice", "pattern partition.slice(b:bat[:any_2], key:int, \
pieces:int):bat[:oid] ", "PARTslicekeyed;", "Perform a value based partition to \
produce candidate lists " ] [ "pcre", "imatch", "command pcre.imatch(s:str, \
pat:str):bit ", "PCREimatch;", "Caseless Perl Compatible Regular Expression pattern \
matching against a string" ] [ "pcre", "index", "command pcre.index(pat:pcre, \
s:str):int ", "PCREindex;", "match a pattern, return matched position (or 0 when not \
found)" ] [ "pcre", "match", "command pcre.match(s:str, pat:str):bit \
", "PCREmatch;", "Perl Compatible Regular Expression pattern matching against a \
string" ]
diff --git a/clients/Tests/exports.stable.out b/clients/Tests/exports.stable.out
--- a/clients/Tests/exports.stable.out
+++ b/clients/Tests/exports.stable.out
@@ -1597,7 +1597,9 @@ str OPTvolcanoImplementation(Client cntx
str OPTwlcImplementation(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci);
str OPTwrapper(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr p);
str PARThash(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr p);
+str PARThashkeyed(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr p);
str PARTslice(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr p);
+str PARTslicekeyed(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr p);
str PCREilike2(bit *ret, const str *s, const str *pat);
str PCREilike3(bit *ret, const str *s, const str *pat, const str *esc);
str PCREimatch(bit *ret, const str *val, const str *pat);
diff --git a/monetdb5/modules/mal/Tests/hashpartition.malC \
b/monetdb5/modules/mal/Tests/hashpartition.malC
--- a/monetdb5/modules/mal/Tests/hashpartition.malC
+++ b/monetdb5/modules/mal/Tests/hashpartition.malC
@@ -42,6 +42,8 @@ io.print(c2);
io.print(c3);
io.print(c4);
io.print(c5);
+k1 := partition.hash(b,0,5);
+io.print(k1);
(o1,o2):= partition.slice(b);
io.print(o1);
@@ -53,6 +55,8 @@ io.print(o2);
io.print(o3);
io.print(o4);
io.print(o5);
+s1 := partition.slice(b,0,5);
+io.print(s1);
end tst;
diff --git a/monetdb5/modules/mal/Tests/hashpartition.stable.out \
b/monetdb5/modules/mal/Tests/hashpartition.stable.out
--- a/monetdb5/modules/mal/Tests/hashpartition.stable.out
+++ b/monetdb5/modules/mal/Tests/hashpartition.stable.out
@@ -150,6 +150,16 @@ Ready.
[ 5@0, 29 ]
#--------------------------#
# h t # name
+# void int # type
+#--------------------------#
+[ 0@0, 0 ]
+[ 1@0, 5 ]
+[ 2@0, 10 ]
+[ 3@0, 15 ]
+[ 4@0, 20 ]
+[ 5@0, 25 ]
+#--------------------------#
+# h t # name
# void oid # type
#--------------------------#
[ 0@0, 0@0 ]
@@ -236,6 +246,16 @@ Ready.
[ 3@0, 19@0 ]
[ 4@0, 24@0 ]
[ 5@0, 29@0 ]
+#--------------------------#
+# h t # name
+# void oid # type
+#--------------------------#
+[ 0@0, 0@0 ]
+[ 1@0, 5@0 ]
+[ 2@0, 10@0 ]
+[ 3@0, 15@0 ]
+[ 4@0, 20@0 ]
+[ 5@0, 25@0 ]
# 14:09:25 >
# 14:09:25 > "Done."
diff --git a/monetdb5/modules/mal/partition.c b/monetdb5/modules/mal/partition.c
--- a/monetdb5/modules/mal/partition.c
+++ b/monetdb5/modules/mal/partition.c
@@ -29,6 +29,19 @@
} \
} while (0)
+#define keyedhashpartition(TYPE) \
+ do { p= BATcount(b); \
+ bi = bat_iterator(b); \
+ for (r=0; r < p; r++) { \
+ TYPE *v = (TYPE *) BUNtloc(bi, r); \
+ TYPE c = mix_##TYPE((*v)); \
+ if((c % pieces) == key && BUNappend(bn, (void*) v, FALSE) != GDK_SUCCEED){ \
+ msg = createException(MAL,"partition.hash","Storage failed");\
+ break;\
+ }\
+ } \
+ } while (0)
+
#define slicepartition(TYPE) \
do { p= BATcount(b); \
bi = bat_iterator(b); \
@@ -42,10 +55,24 @@
} \
} while (0)
+#define keyedslicepartition(TYPE) \
+ do { p= BATcount(b); \
+ bi = bat_iterator(b); \
+ for (r=0; r < p; r++) { \
+ TYPE *v = (TYPE *) BUNtloc(bi, r); \
+ TYPE c = mix_##TYPE((*v)); \
+ if( (c % pieces) == key && BUNappend(bn, (void*) &r, FALSE) != GDK_SUCCEED){ \
+ msg = createException(MAL,"partition.slice","Storage failed");\
+ break;\
+ }\
+ } \
+ } while (0)
+
str
PARThash(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
{
bat *ret = getArgReference_bat(stk,pci,0);
+ int bid = *getArgReference_bat(stk,pci,pci->retc);
int i, pieces;
BAT *b, *bn[MAXPART];
BUN r, p;
@@ -58,7 +85,7 @@ PARThash(Client cntxt, MalBlkPtr mb, Mal
pieces = pci->retc;
if ( pieces >= MAXPART)
throw(MAL,"partition.hash","too many partitions");
- b = BATdescriptor( stk->stk[getArg(pci, pci->retc)].val.ival);
+ b = BATdescriptor( bid);
if ( b == NULL)
throw(MAL, "partition.hash", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
@@ -111,11 +138,73 @@ PARThash(Client cntxt, MalBlkPtr mb, Mal
BBPunfix(b->batCacheid);
return msg;
}
+str
+PARThashkeyed(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+ bat *ret = getArgReference_bat(stk,pci,0);
+ int bid = *getArgReference_bat(stk,pci,1);
+ int key = *getArgReference_int(stk,pci,2);
+ int pieces = *getArgReference_int(stk,pci,3);
+ BAT *b, *bn;
+ BUN r, p;
+ BATiter bi;
+ str msg = MAL_SUCCEED;
+
+
+ (void) cntxt;
+ (void) mb;
+ if ( pieces >= MAXPART || pieces < 1)
+ throw(MAL,"partition.slice","too many partitions");
+ b = BATdescriptor( bid);
+ if ( b == NULL)
+ throw(MAL, "partition.hash", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
+
+ size_t cap = BATcount(b) / pieces * 1.1;
+ bn = COLnew(0, b->ttype, cap, TRANSIENT);
+ if (bn == NULL){
+ BBPunfix(b->batCacheid);
+ throw(MAL, "partition.hash", SQLSTATE(HY001) MAL_MALLOC_FAIL);
+ }
+ if ( b->tvheap && bn->tvheap && ATOMstorage(b->ttype) != TYPE_str){
+ if (HEAPextend(bn->tvheap, b->tvheap->size, TRUE) != GDK_SUCCEED) {
+ BBPunfix(b->batCacheid);
+ BBPunfix(bn->batCacheid);
+ throw(MAL, "partition.hash", SQLSTATE(HY001) MAL_MALLOC_FAIL);
+ }
+ }
+ /* distribute the elements over multiple partitions */
+ switch(ATOMstorage(b->ttype)){
+ case TYPE_bte:
+ keyedhashpartition(bte);
+ break;
+ case TYPE_sht:
+ keyedhashpartition(sht);
+ break;
+ case TYPE_int:
+ keyedhashpartition(int);
+ break;
+ case TYPE_lng:
+ keyedhashpartition(lng);
+ break;
+#ifdef HAVE_HGE
+ case TYPE_hge:
+ keyedhashpartition(hge);
+ break;
+#endif
+ default:
+ msg = createException(MAL,"partition.hash","Non-supported type");
+ }
+
+ BBPkeepref(*ret = bn->batCacheid);
+ BBPunfix(b->batCacheid);
+ return msg;
+}
str
PARTslice(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
{
bat *ret = getArgReference_bat(stk,pci,0);
+ int bid = *getArgReference_bat(stk,pci,pci->retc);
int i, pieces;
BAT *b, *bn[MAXPART];
BUN r, p;
@@ -128,7 +217,7 @@ PARTslice(Client cntxt, MalBlkPtr mb, Ma
pieces = pci->retc;
if ( pieces >= MAXPART)
throw(MAL,"partition.slice","too many partitions");
- b = BATdescriptor( stk->stk[getArg(pci, pci->retc)].val.ival);
+ b = BATdescriptor( bid);
if ( b == NULL)
throw(MAL, "partition.slice", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
@@ -172,3 +261,57 @@ PARTslice(Client cntxt, MalBlkPtr mb, Ma
BBPunfix(b->batCacheid);
return msg;
}
+
+str
+PARTslicekeyed(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+ bat *ret = getArgReference_bat(stk,pci,0);
+ int bid = *getArgReference_bat(stk,pci,1);
+ int key = *getArgReference_int(stk,pci,2);
+ int pieces = *getArgReference_int(stk,pci,3);
+ BAT *b, *bn;
+ BUN r, p;
+ BATiter bi;
+ str msg = MAL_SUCCEED;
+
+ (void) cntxt;
+ (void) mb;
+ if ( pieces >= MAXPART || pieces < 1)
+ throw(MAL,"partition.slice","too many partitions");
+ b = BATdescriptor( bid);
+ if ( b == NULL)
+ throw(MAL, "partition.slice", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
+
+ size_t cap = BATcount(b) / pieces * 1.1;
+ bn = COLnew(0, TYPE_oid, cap, TRANSIENT);
+ if (bn == NULL){
+ BBPunfix(b->batCacheid);
+ throw(MAL, "partition.slice", SQLSTATE(HY001) MAL_MALLOC_FAIL);
+ }
+ /* distribute the elements over multiple partitions */
+ switch(ATOMstorage(b->ttype)){
+ case TYPE_bte:
+ keyedslicepartition(bte);
+ break;
+ case TYPE_sht:
+ keyedslicepartition(sht);
+ break;
+ case TYPE_int:
+ keyedslicepartition(int);
+ break;
+ case TYPE_lng:
+ keyedslicepartition(lng);
+ break;
+#ifdef HAVE_HGE
+ case TYPE_hge:
+ keyedslicepartition(hge);
+ break;
+#endif
+ default:
+ msg = createException(MAL,"partition.slice","Non-supported type");
+ }
+
+ BBPkeepref(*ret = bn->batCacheid);
+ BBPunfix(b->batCacheid);
+ return msg;
+}
diff --git a/monetdb5/modules/mal/partition.h b/monetdb5/modules/mal/partition.h
--- a/monetdb5/modules/mal/partition.h
+++ b/monetdb5/modules/mal/partition.h
@@ -14,6 +14,8 @@
#include "mal_interpreter.h"
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic