[prev in list] [next in list] [prev in thread] [next in thread] 

List:       monetdb-checkins
Subject:    MonetDB: iot - React to new baskets and cleanup afterwards.
From:       Martin Kersten <commits+mk=cwi.nl () monetdb ! org>
Date:       2016-04-27 21:41:35
Message-ID: hg.a1679b57a54a.1461793295.6315528441665844383 () monetdb2 ! cwi-incubator ! nl
[Download RAW message or body]

Changeset: a1679b57a54a for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=a1679b57a54a
Modified Files:
	monetdb5/optimizer/opt_iot.c
	sql/backends/monet5/iot/Tests/webtest.sql
	sql/backends/monet5/iot/basket.c
	sql/backends/monet5/iot/basket.h
	sql/backends/monet5/iot/basket.mal
	sql/backends/monet5/iot/petrinet.c
Branch: iot
Log Message:

React to new baskets and cleanup afterwards.


diffs (165 lines):

diff --git a/monetdb5/optimizer/opt_iot.c b/monetdb5/optimizer/opt_iot.c
--- a/monetdb5/optimizer/opt_iot.c
+++ b/monetdb5/optimizer/opt_iot.c
@@ -164,6 +164,13 @@ OPTiotImplementation(Client cntxt, MalBl
 			if( getModuleId(p)== iotRef && getFunctionId(p)==errorRef)
 				noerror++;
 			if (p->token == ENDsymbol && btop > 0 && noerror==0) {
+				// empty all baskets used
+				for( j=0; j<btop; j++)
+				if( done[j]==0) {
+					p= newStmt(mb,basketRef,finishRef);
+					p= pushStr(mb,p, schemas[j]);
+					p= pushStr(mb,p, tables[j]);
+				}
 				/* catch any exception left behind */
 				r = newAssignment(mb);
 				j = getArg(r, 0) = newVariable(mb, GDKstrdup("SQLexception"), TYPE_str);
diff --git a/sql/backends/monet5/iot/Tests/webtest.sql \
                b/sql/backends/monet5/iot/Tests/webtest.sql
--- a/sql/backends/monet5/iot/Tests/webtest.sql
+++ b/sql/backends/monet5/iot/Tests/webtest.sql
@@ -1,11 +1,19 @@
 set schema iot;
 create stream table temps( iotclk timestamp, room string , temperature real);
+create table atemps( iotclk timestamp, cnt int , temperature real);
 -- remainder depends on location of the baskets root
 
 declare baskets string;
 set baskets= '/ufs/mk/baskets/measures/temperatures/';
 
 call iot.basket('iot','temps', concat(baskets,'1'));
-select * from temps;
-call iot.basket('iot','temps', concat(baskets,'1'));
-select * from temps;
+create procedure web00()
+begin
+    insert into atemps select min(iotclk), count(*), avg(temperature) from temps;
+end;
+
+call iot.query('iot','web00');
+
+select * from  iot.baskets();
+select * from  iot.queries();
+
diff --git a/sql/backends/monet5/iot/basket.c b/sql/backends/monet5/iot/basket.c
--- a/sql/backends/monet5/iot/basket.c
+++ b/sql/backends/monet5/iot/basket.c
@@ -95,6 +95,7 @@ static str
 BSKTnewbasket(sql_schema *s, sql_table *t)
 {
 	int i, idx;
+	int colcnt=0;
 	node *o;
 
 	// Don't introduce the same basket twice
@@ -117,10 +118,10 @@ BSKTnewbasket(sql_schema *s, sql_table *
 			MT_lock_unset(&iotLock);
 			throw(MAL,"baskets.register","Unsupported type %d",tpe);
 		}
-		baskets[idx].count++;
+		colcnt++;
 	}
 	// collect the column names
-	baskets[idx].cols = (str*) GDKzalloc(sizeof(str) * (baskets[idx].count+1));
+	baskets[idx].cols = (str*) GDKzalloc(sizeof(str) * (colcnt+1));
 	for (i=0, o = t->columns.set->h; o; o = o->next){
         sql_column *col = o->data;
 		baskets[idx].cols[i++]=  col->base.name;
@@ -510,6 +511,8 @@ BSKTpushBasket(Client cntxt, MalBlkPtr m
 		assert( access (buf,R_OK) == 0);
 		//unlink(buf);
 	}
+	baskets[bskt].status = BSKTAVAILABLE;
+	baskets[bskt].count = cnt;
 
 recover:
 	/* reset all BATs when they are misaligned or error occurred */
@@ -526,6 +529,42 @@ recover:
     (void) mb;
     return msg;
 }
+
+/* remove tuples from a basket according to the sliding policy */
+str
+BSKTfinish(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+    str sch = *getArgReference_str(stk, pci, 1);
+    str tbl = *getArgReference_str(stk, pci, 2);
+	BAT *b;
+	node *n;
+	mvc *m = NULL;
+	str msg;
+	int bskt;
+
+	(void) mb;
+	(void) stk;
+	(void) pci;
+
+	msg= getSQLContext(cntxt,NULL, &m, NULL);
+    bskt = BSKTlocate(sch,tbl);
+	if (bskt == 0)
+		throw(SQL, "iot.finish", "Could not find the basket %s.%s",sch,tbl);
+
+	if( msg ==MAL_SUCCEED)
+	/* reset all stream BATs to empty*/
+	for( n = baskets[bskt].table->columns.set->h; n; n= n->next){
+		sql_column *c = n->data;
+		b = store_funcs.bind_col(m->session->tr,c,RDONLY);
+		assert( b );
+		// use the proper basket policy
+		BATsetcount(b,0);
+		BBPunfix(b->batCacheid);
+	}
+	baskets[bskt].count= 0;
+	return msg;
+}
+
 str
 BSKTdump(void *ret)
 {
diff --git a/sql/backends/monet5/iot/basket.h b/sql/backends/monet5/iot/basket.h
--- a/sql/backends/monet5/iot/basket.h
+++ b/sql/backends/monet5/iot/basket.h
@@ -84,7 +84,7 @@ iot_export str BSKTwindow(Client cntxt, 
 
 iot_export str BSKTtable( Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci);
 iot_export str BSKTtableerrors(bat *nmeId, bat *errorId);
-iot_export str BSKTerror(void  *ret, str *sch, str *fcn, str *msg);
+iot_export str BSKTfinish( Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci);
 
 //iot_export str BSKTnewbasket(sql_schema *s, sql_table *t);
 iot_export str BSKTdrop(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci);
diff --git a/sql/backends/monet5/iot/basket.mal b/sql/backends/monet5/iot/basket.mal
--- a/sql/backends/monet5/iot/basket.mal
+++ b/sql/backends/monet5/iot/basket.mal
@@ -81,6 +81,10 @@ command reset():void
 address BSKTreset
 comment "Remove all baskets";
 
+pattern finish(sch:str, tbl:str):void
+address BSKTfinish
+comment "Empty the basket using the prevaling policy";
+
 pattern iot.basket(sch:str, tbl:str, dir:str):void
 address BSKTpushBasket
 comment "Push a directory with the binary files";
diff --git a/sql/backends/monet5/iot/petrinet.c b/sql/backends/monet5/iot/petrinet.c
--- a/sql/backends/monet5/iot/petrinet.c
+++ b/sql/backends/monet5/iot/petrinet.c
@@ -335,6 +335,7 @@ PNexecute( void *n)
 	_DEBUG_PETRINET_ mnstr_printf(PNout, "#petrinet.execute %s.%s transition \
done:%s\n",node->modname, node->fcnname, (msg != MAL_SUCCEED?msg:""));  
 	MT_lock_set(&iotLock);
+	// empty the baskets according to their policy
 	for ( i=0; i< j &&  node->enabled && node->places[i]; i++) {
 		idx = node->places[i];
 		baskets[idx].status = BSKTAVAILABLE;
@@ -398,7 +399,7 @@ PNscheduler(void *dummy)
 					}
 				} else
 				/* consider baskets that are properly filled */
-				if (baskets[idx].threshold > baskets[idx].count){
+				if (baskets[idx].threshold > baskets[idx].count || baskets[idx].count == 0){
 					pnet[i].enabled = 0;
 					break;
 				}
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list


[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic