[prev in list] [next in list] [prev in thread] [next in thread]
List: monetdb-checkins
Subject: MonetDB: iot - React to new baskets and cleanup afterwards.
From: Martin Kersten <commits+mk=cwi.nl () monetdb ! org>
Date: 2016-04-27 21:41:35
Message-ID: hg.a1679b57a54a.1461793295.6315528441665844383 () monetdb2 ! cwi-incubator ! nl
[Download RAW message or body]
Changeset: a1679b57a54a for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=a1679b57a54a
Modified Files:
monetdb5/optimizer/opt_iot.c
sql/backends/monet5/iot/Tests/webtest.sql
sql/backends/monet5/iot/basket.c
sql/backends/monet5/iot/basket.h
sql/backends/monet5/iot/basket.mal
sql/backends/monet5/iot/petrinet.c
Branch: iot
Log Message:
React to new baskets and cleanup afterwards.
diffs (165 lines):
diff --git a/monetdb5/optimizer/opt_iot.c b/monetdb5/optimizer/opt_iot.c
--- a/monetdb5/optimizer/opt_iot.c
+++ b/monetdb5/optimizer/opt_iot.c
@@ -164,6 +164,13 @@ OPTiotImplementation(Client cntxt, MalBl
if( getModuleId(p)== iotRef && getFunctionId(p)==errorRef)
noerror++;
if (p->token == ENDsymbol && btop > 0 && noerror==0) {
+ // empty all baskets used
+ for( j=0; j<btop; j++)
+ if( done[j]==0) {
+ p= newStmt(mb,basketRef,finishRef);
+ p= pushStr(mb,p, schemas[j]);
+ p= pushStr(mb,p, tables[j]);
+ }
/* catch any exception left behind */
r = newAssignment(mb);
j = getArg(r, 0) = newVariable(mb, GDKstrdup("SQLexception"), TYPE_str);
diff --git a/sql/backends/monet5/iot/Tests/webtest.sql \
b/sql/backends/monet5/iot/Tests/webtest.sql
--- a/sql/backends/monet5/iot/Tests/webtest.sql
+++ b/sql/backends/monet5/iot/Tests/webtest.sql
@@ -1,11 +1,19 @@
set schema iot;
create stream table temps( iotclk timestamp, room string , temperature real);
+create table atemps( iotclk timestamp, cnt int , temperature real);
-- remainder depends on location of the baskets root
declare baskets string;
set baskets= '/ufs/mk/baskets/measures/temperatures/';
call iot.basket('iot','temps', concat(baskets,'1'));
-select * from temps;
-call iot.basket('iot','temps', concat(baskets,'1'));
-select * from temps;
+create procedure web00()
+begin
+ insert into atemps select min(iotclk), count(*), avg(temperature) from temps;
+end;
+
+call iot.query('iot','web00');
+
+select * from iot.baskets();
+select * from iot.queries();
+
diff --git a/sql/backends/monet5/iot/basket.c b/sql/backends/monet5/iot/basket.c
--- a/sql/backends/monet5/iot/basket.c
+++ b/sql/backends/monet5/iot/basket.c
@@ -95,6 +95,7 @@ static str
BSKTnewbasket(sql_schema *s, sql_table *t)
{
int i, idx;
+ int colcnt=0;
node *o;
// Don't introduce the same basket twice
@@ -117,10 +118,10 @@ BSKTnewbasket(sql_schema *s, sql_table *
MT_lock_unset(&iotLock);
throw(MAL,"baskets.register","Unsupported type %d",tpe);
}
- baskets[idx].count++;
+ colcnt++;
}
// collect the column names
- baskets[idx].cols = (str*) GDKzalloc(sizeof(str) * (baskets[idx].count+1));
+ baskets[idx].cols = (str*) GDKzalloc(sizeof(str) * (colcnt+1));
for (i=0, o = t->columns.set->h; o; o = o->next){
sql_column *col = o->data;
baskets[idx].cols[i++]= col->base.name;
@@ -510,6 +511,8 @@ BSKTpushBasket(Client cntxt, MalBlkPtr m
assert( access (buf,R_OK) == 0);
//unlink(buf);
}
+ baskets[bskt].status = BSKTAVAILABLE;
+ baskets[bskt].count = cnt;
recover:
/* reset all BATs when they are misaligned or error occurred */
@@ -526,6 +529,42 @@ recover:
(void) mb;
return msg;
}
+
+/* remove tuples from a basket according to the sliding policy */
+str
+BSKTfinish(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+ str sch = *getArgReference_str(stk, pci, 1);
+ str tbl = *getArgReference_str(stk, pci, 2);
+ BAT *b;
+ node *n;
+ mvc *m = NULL;
+ str msg;
+ int bskt;
+
+ (void) mb;
+ (void) stk;
+ (void) pci;
+
+ msg= getSQLContext(cntxt,NULL, &m, NULL);
+ bskt = BSKTlocate(sch,tbl);
+ if (bskt == 0)
+ throw(SQL, "iot.finish", "Could not find the basket %s.%s",sch,tbl);
+
+ if( msg ==MAL_SUCCEED)
+ /* reset all stream BATs to empty*/
+ for( n = baskets[bskt].table->columns.set->h; n; n= n->next){
+ sql_column *c = n->data;
+ b = store_funcs.bind_col(m->session->tr,c,RDONLY);
+ assert( b );
+ // use the proper basket policy
+ BATsetcount(b,0);
+ BBPunfix(b->batCacheid);
+ }
+ baskets[bskt].count= 0;
+ return msg;
+}
+
str
BSKTdump(void *ret)
{
diff --git a/sql/backends/monet5/iot/basket.h b/sql/backends/monet5/iot/basket.h
--- a/sql/backends/monet5/iot/basket.h
+++ b/sql/backends/monet5/iot/basket.h
@@ -84,7 +84,7 @@ iot_export str BSKTwindow(Client cntxt,
iot_export str BSKTtable( Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci);
iot_export str BSKTtableerrors(bat *nmeId, bat *errorId);
-iot_export str BSKTerror(void *ret, str *sch, str *fcn, str *msg);
+iot_export str BSKTfinish( Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci);
//iot_export str BSKTnewbasket(sql_schema *s, sql_table *t);
iot_export str BSKTdrop(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci);
diff --git a/sql/backends/monet5/iot/basket.mal b/sql/backends/monet5/iot/basket.mal
--- a/sql/backends/monet5/iot/basket.mal
+++ b/sql/backends/monet5/iot/basket.mal
@@ -81,6 +81,10 @@ command reset():void
address BSKTreset
comment "Remove all baskets";
+pattern finish(sch:str, tbl:str):void
+address BSKTfinish
+comment "Empty the basket using the prevaling policy";
+
pattern iot.basket(sch:str, tbl:str, dir:str):void
address BSKTpushBasket
comment "Push a directory with the binary files";
diff --git a/sql/backends/monet5/iot/petrinet.c b/sql/backends/monet5/iot/petrinet.c
--- a/sql/backends/monet5/iot/petrinet.c
+++ b/sql/backends/monet5/iot/petrinet.c
@@ -335,6 +335,7 @@ PNexecute( void *n)
_DEBUG_PETRINET_ mnstr_printf(PNout, "#petrinet.execute %s.%s transition \
done:%s\n",node->modname, node->fcnname, (msg != MAL_SUCCEED?msg:""));
MT_lock_set(&iotLock);
+ // empty the baskets according to their policy
for ( i=0; i< j && node->enabled && node->places[i]; i++) {
idx = node->places[i];
baskets[idx].status = BSKTAVAILABLE;
@@ -398,7 +399,7 @@ PNscheduler(void *dummy)
}
} else
/* consider baskets that are properly filled */
- if (baskets[idx].threshold > baskets[idx].count){
+ if (baskets[idx].threshold > baskets[idx].count || baskets[idx].count == 0){
pnet[i].enabled = 0;
break;
}
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list
[prev in list] [next in list] [prev in thread] [next in thread]
Configure |
About |
News |
Add a list |
Sponsored by KoreLogic