[prev in list] [next in list] [prev in thread] [next in thread] 

List:       drbd-cvs
Subject:    [DRBD-cvs] drbd by lars; several "hotfixes": spin_lock_irq*SAVE*
From:       drbd-cvs () linbit ! com
Date:       2004-04-24 16:35:27
Message-ID: 20040424163527.5C0D415DF5C () garcon ! linbit ! com
[Download RAW message or body]

DRBD CVS committal

Author  : lars
Module  : drbd

Dir     : drbd/drbd


Modified Files:
      Tag: rel-0_7-branch
	drbd_actlog.c drbd_dsender.c drbd_fs.c drbd_int.h drbd_main.c 
	drbd_receiver.c drbd_req-2.4.c 


Log Message:
several "hotfixes":
	spin_lock_irq*SAVE*
	cleanup on Unconfigure *important for non-module builds

some "obviously correct" fixes and reordering of statements,

some debugging aid

some "FIXME" comments:
	currently we still ignore IO error on metadata in most cases.
	we don't have means to cope with corrupted on disk bitmap either.

bitmap code really needs some cleanup!

this was a busy three-days session ...

	--lge

===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/Attic/drbd_actlog.c,v
retrieving revision 1.1.2.89
retrieving revision 1.1.2.90
diff -u -3 -r1.1.2.89 -r1.1.2.90
--- drbd_actlog.c	23 Apr 2004 10:26:20 -0000	1.1.2.89
+++ drbd_actlog.c	24 Apr 2004 16:35:21 -0000	1.1.2.90
@@ -41,6 +41,15 @@
 	struct buffer_head bh;
 	struct completion event;
 
+	/* just to play safe: fill it with zeroes. if you want to, you
+	 * could define it away based on "PARANOIA" or something. */
+	if (rw != WRITE) {
+		/* most often this is already mapped, so don't worry
+		 * about performance loss */
+		void *b = kmap(mdev->md_io_page);
+		memset(b,0,PAGE_SIZE);
+		kunmap(mdev->md_io_page);
+	}
 	init_completion(&event);
 	init_buffer(&bh, drbd_md_io_complete, &event);
 	bh.b_rdev = mdev->md_bdev;
@@ -64,6 +73,15 @@
 	struct bio_vec vec;
 	struct completion event;
 
+	/* just to play safe: fill it with zeroes. if you want to, you
+	 * could define it away based on "PARANOIA" or something. */
+	if (rw != WRITE) {
+		/* most often this is already mapped, so don't worry
+		 * about performance loss */
+		void *b = kmap(mdev->md_io_page);
+		memset(b,0,PAGE_SIZE);
+		kunmap(mdev->md_io_page);
+	}
 	bio_init(&bio);
 	bio.bi_io_vec = &vec;
 	vec.bv_page = mdev->md_io_page;
@@ -77,6 +95,10 @@
 	init_completion(&event);
 	bio.bi_private = &event;
 	bio.bi_end_io = drbd_md_io_complete;
+
+	INFO("%s [%d]:%s(,%ld,%s)\n",
+	     current->comm, current->pid, __func__,
+	     sector, rw ? "WRITE" : "READ");
 #ifdef BIO_RW_SYNC
 	submit_bio(rw | (1 << BIO_RW_SYNC), &bio);
 #else
@@ -166,6 +188,8 @@
 		drbd_al_write_transaction(mdev,al_ext,enr);
 		mdev->al_writ_cnt++;
 
+		DUMPI(al_ext->lc_number);
+		DUMPI(mdev->act_log->new_number);
 		spin_lock_irq(&mdev->al_lock);
 		lc_changed(mdev->act_log,al_ext);
 		spin_unlock_irq(&mdev->al_lock);
@@ -196,6 +220,11 @@
 	spin_unlock_irqrestore(&mdev->al_lock,flags);
 }
 
+/*
+
+FIXME md_io might fail unnoticed!
+
+*/
 STATIC void
 drbd_al_write_transaction(struct Drbd_Conf *mdev,struct lc_element *updated,
 			  unsigned int new_enr)
@@ -246,6 +275,7 @@
 
 	sector = drbd_md_ss(mdev) + MD_AL_OFFSET + mdev->al_tr_pos ;
 
+	/* FIXME what if this fails ?? */
 	drbd_md_sync_page_io(mdev,sector,WRITE);
 
 	if( ++mdev->al_tr_pos > div_ceil(mdev->act_log->nr_elements,AL_EXTENTS_PT) ) {
@@ -256,39 +286,30 @@
 	up(&mdev->md_io_mutex);
 }
 
-/* In case this function returns 1 == success, the caller must do
-		kunmap(mdev->md_io_page);
-		up(&mdev->md_io_mutex);
- */
+/*
+
+FIXME md_io might fail unnoticed!
+
+*/
 STATIC int drbd_al_read_tr(struct Drbd_Conf *mdev,
-			   struct al_transaction** bp,
+			   struct al_transaction* b,
 			   int index)
 {
-	struct al_transaction* buffer;
 	sector_t sector;
 	int rv,i;
 	u32 xor_sum=0;
 
-	down(&mdev->md_io_mutex);
 	sector = drbd_md_ss(mdev) + MD_AL_OFFSET + index;
 
+	/* FIXME what if this fails ?? */
 	drbd_md_sync_page_io(mdev,sector,READ);
 
-	buffer = (struct al_transaction*)kmap(mdev->md_io_page);
-
-	rv = ( be32_to_cpu(buffer->magic) == DRBD_MAGIC );
+	rv = ( be32_to_cpu(b->magic) == DRBD_MAGIC );
 
 	for(i=0;i<AL_EXTENTS_PT+1;i++) {
-		xor_sum ^= be32_to_cpu(buffer->updates[i].extent);
-	}
-	rv &= (xor_sum == be32_to_cpu(buffer->xor_sum));
-
-	if(rv) {
-		*bp = buffer;
-	} else {
-		kunmap(mdev->md_io_page);
-		up(&mdev->md_io_mutex);
+		xor_sum ^= be32_to_cpu(b->updates[i].extent);
 	}
+	rv &= (xor_sum == be32_to_cpu(b->xor_sum));
 
 	return rv;
 }
@@ -304,13 +325,17 @@
 
 	mx = div_ceil(mdev->act_log->nr_elements,AL_EXTENTS_PT);
 
+	/* lock out all other meta data io for now,
+	 * and make sure the page is mapped.
+	 */
+	down(&mdev->md_io_mutex);
+	buffer = kmap(mdev->md_io_page);
+
 	// Find the valid transaction in the log
 	for(i=0;i<=mx;i++) {
-		if(!drbd_al_read_tr(mdev,&buffer,i)) continue;
+		if(!drbd_al_read_tr(mdev,buffer,i)) continue;
 		cnr = be32_to_cpu(buffer->tr_number);
 		// INFO("index %d valid tnr=%d\n",i,cnr);
-		kunmap(mdev->md_io_page);
-		up(&mdev->md_io_mutex);
 
 		if(cnr == -1) overflow=1;
 
@@ -327,23 +352,28 @@
 	if(from == -1 || to == -1) {
 		WARN("No usable activity log found.\n");
 
+		kunmap(mdev->md_io_page);
+		up(&mdev->md_io_mutex);
 		return;
 	}
 
 	// Read the valid transactions.
 	// INFO("Reading from %d to %d.\n",from,to);
 
+	/* this should better be handled by a for loop, no?
+	 */
 	i=from;
 	while(1) {
 		int j,pos;
 		unsigned int extent_nr;
 		unsigned int trn;
 
-		rv = drbd_al_read_tr(mdev,&buffer,i);
+		rv = drbd_al_read_tr(mdev,buffer,i);
 		ERR_IF(!rv) goto cancel;
 
 		trn=be32_to_cpu(buffer->tr_number);
 
+		spin_lock_irq(&mdev->al_lock);
 		for(j=0;j<AL_EXTENTS_PT+1;j++) {
 			pos = be32_to_cpu(buffer->updates[j].pos);
 			extent_nr = be32_to_cpu(buffer->updates[j].extent);
@@ -351,14 +381,10 @@
 			if(extent_nr == LC_FREE) continue;
 
 		       //if(j<3) INFO("T%03d S%03d=E%06d\n",trn,pos,extent_nr);
-			spin_lock_irq(&mdev->al_lock);
 			lc_set(mdev->act_log,extent_nr,pos);
-			spin_unlock_irq(&mdev->al_lock);
 			active_extents++;
 		}
-
-		kunmap(mdev->md_io_page);
-		up(&mdev->md_io_mutex);
+		spin_unlock_irq(&mdev->al_lock);
 
 		transactions++;
 
@@ -374,6 +400,10 @@
 		mdev->al_tr_pos=0;
 	}
 
+	/* ok, we are done with it */
+	kunmap(mdev->md_io_page);
+	up(&mdev->md_io_mutex);
+
 	INFO("Found %d transactions (%d active extents) in activity log.\n",
 	     transactions,active_extents);
 }
@@ -440,7 +470,7 @@
 
 	if( !inc_local_md_only(mdev) ) return;
 
-	exts = div_ceil(drbd_get_capacity(mdev->this_bdev), 
+	exts = div_ceil(drbd_get_capacity(mdev->this_bdev),
 			BM_EXTENT_SIZE >> 9 );
 
 	for(i=0;i<exts;i++) {
@@ -486,6 +516,9 @@
 
 /**
  * drbd_read_bm: Read the whole bitmap from its on disk location.
+
+FIXME md_io might fail unnoticed!
+
  */
 void drbd_read_bm(struct Drbd_Conf *mdev)
 {
@@ -496,10 +529,11 @@
 	int so = 0;
 
 	bm_i = 0;
-	bm_words = mdev->mbds_id->size/sizeof(unsigned long);
+	bm_words = mdev->mbds_id->size/sizeof(long);
 	bm = mdev->mbds_id->bm;
 
 	down(&mdev->md_io_mutex);
+	buffer = (unsigned long *)kmap(mdev->md_io_page);
 
 	while (1) {
 		want=min_t(int,512/sizeof(long),bm_words-bm_i);
@@ -508,18 +542,17 @@
 		sector = drbd_md_ss(mdev) + MD_BM_OFFSET + so;
 		so++;
 
+		/* FIXME what if this fails ?? */
 		drbd_md_sync_page_io(mdev,sector,READ);
 
-		buffer = (unsigned long *)kmap(mdev->md_io_page);
-
 		for(buf_i=0;buf_i<want;buf_i++) {
 			word = lel_to_cpu(buffer[buf_i]);
 			bits += hweight_long(word);
 			bm[bm_i++] = word;
 		}
-		kunmap(mdev->md_io_page);
 	}
 
+	kunmap(mdev->md_io_page);
 	up(&mdev->md_io_mutex);
 
 	mdev->rs_total = (bits << (BM_BLOCK_SIZE_B - 9)) +
@@ -537,30 +570,30 @@
  *       ATTENTION: Based on AL_EXTENT_SIZE, although the chunk
  *                  we write might represent more storage. 
  *                  ( actually AL_EXTENT_SIZE*EXTENTS_PER_SECTOR )
+
+FIXME md_io might fail unnoticed!
+
  */
 STATIC void drbd_update_on_disk_bm(struct Drbd_Conf *mdev,unsigned int enr)
 {
 	unsigned long * buffer, * bm;
-	int want,buf_i,bm_words,bm_i;
+	unsigned int want,buf_i,bm_words,bm_i;
 	sector_t sector;
 
 	D_ASSERT(atomic_read(&mdev->local_cnt)>0);
 	enr = (enr & ~(EXTENTS_PER_SECTOR-1) );
 
 	bm = mdev->mbds_id->bm;
-	bm_words = mdev->mbds_id->size/sizeof(unsigned long);
+	bm_words = mdev->mbds_id->size/sizeof(long);
 	bm_i = enr * BM_WORDS_PER_EXTENT ;
 
-	/* FIXME yes, this triggers
-	 * not exactly reproduceable, though :(
-	 * some error in param exchange,
-	 * bitmap not properly resized ...
-	 */
 	ERR_IF(bm_i >= bm_words) {
 		DUMPI(bm_i);
 		DUMPI(bm_words);
+		dump_stack();
+		return;
 	}
-	want=min_t(int,512/sizeof(long),bm_words-bm_i);
+	want=min_t(unsigned int,512/sizeof(long),bm_words-bm_i);
 
 	down(&mdev->md_io_mutex); // protects md_io_buffer
 	buffer = (unsigned long *)kmap(mdev->md_io_page);
@@ -573,6 +606,7 @@
 
 	sector = drbd_md_ss(mdev) + MD_BM_OFFSET + enr/EXTENTS_PER_SECTOR;
 
+	/* FIXME what if this fails ?? */
 	drbd_md_sync_page_io(mdev,sector,WRITE);
 	up(&mdev->md_io_mutex);
 
@@ -602,6 +636,7 @@
 	return 1;
 }
 
+
 /* ATTENTION. The AL's extents are 4MB each, while the extents in the  *
  * resync LRU-cache are 16MB each.                                     */
 STATIC void drbd_try_clear_on_disk_bm(struct Drbd_Conf *mdev,sector_t sector,
@@ -618,6 +653,11 @@
 	// a 16 MB extent border. (Currently this is true...)
 	enr = (sector >> (BM_EXTENT_SIZE_B-9));
 
+	/*
+	INFO("%s [%d]:%s(,%ld,%d)\n",
+	     current->comm, current->pid, __func__,
+	     sector, cleared);
+	*/
 	spin_lock_irqsave(&mdev->al_lock,flags);
 	ext = (struct bm_extent *) lc_get(mdev->resync,enr);
 	if (ext) {
@@ -625,10 +665,12 @@
 			ext->rs_left -= cleared;
 			D_ASSERT(ext->rs_left >= 0);
 		} else {
-			//WARN("Recounting sectors (resync LRU too small?)\n");
-			// This element should be in the cache 
+			WARN("Recounting sectors in %d (resync LRU too small?)\n", enr);
+			// This element should be in the cache
 			// since drbd_rs_begin_io() pulled it already in.
 			ext->rs_left = bm_count_sectors(mdev->mbds_id,enr);
+			DUMPI(ext->lce.lc_number);
+			DUMPI(mdev->resync->new_number);
 			lc_changed(mdev->resync,&ext->lce);
 		}
 		lc_put(mdev->resync,&ext->lce);
@@ -703,6 +745,10 @@
 	if (bm_ext) {
 		if(bm_ext->lce.lc_number != enr) {
 			bm_ext->rs_left = bm_count_sectors(mdev->mbds_id,enr);
+			/*
+			DUMPI(bm_ext->lce.lc_number);
+			DUMPI(mdev->resync->new_number);
+			*/
 			lc_changed(mdev->resync,(struct lc_element*)bm_ext);
 			wake_up(&mdev->al_wait);
 		}
@@ -728,7 +774,7 @@
 {
 	struct lc_element* al_ext;
 	int rv=0;
-	
+
 	spin_lock_irq(&mdev->al_lock);
 	if(unlikely(enr == mdev->act_log->new_number)) rv=1;
 	else {
@@ -776,11 +822,12 @@
 {
 	unsigned int enr = (sector >> (BM_EXTENT_SIZE_B-9));
 	struct bm_extent* bm_ext;
+	unsigned long flags;
 
-	spin_lock_irq(&mdev->al_lock);
+	spin_lock_irqsave(&mdev->al_lock,flags);
 	bm_ext = (struct bm_extent*) lc_find(mdev->resync,enr);
 	if(!bm_ext) {
-		spin_unlock_irq(&mdev->al_lock);
+		spin_unlock_irqrestore(&mdev->al_lock,flags);
 		ERR("drbd_rs_complete_io() called, but extent not found");
 		return;
 	}
@@ -792,7 +839,7 @@
 		wake_up(&mdev->al_wait);
 	}
 
-	spin_unlock_irq(&mdev->al_lock);
+	spin_unlock_irqrestore(&mdev->al_lock,flags);
 }
 
 /**
@@ -817,5 +864,5 @@
 	}
 
 	wake_up(&mdev->al_wait);
-	spin_unlock_irq(&mdev->al_lock);	
+	spin_unlock_irq(&mdev->al_lock);
 }
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/Attic/drbd_dsender.c,v
retrieving revision 1.1.2.91
retrieving revision 1.1.2.92
diff -u -3 -r1.1.2.91 -r1.1.2.92
--- drbd_dsender.c	16 Apr 2004 12:18:55 -0000	1.1.2.91
+++ drbd_dsender.c	24 Apr 2004 16:35:21 -0000	1.1.2.92
@@ -394,12 +394,17 @@
 	return ok;
 }
 
-int w_resync_inactive(drbd_dev *mdev, struct drbd_work *w, int unused)
+int w_resync_inactive(drbd_dev *mdev, struct drbd_work *w, int cancel)
 {
+	ERR_IF(cancel) return 1;
 	ERR("resync inactive, but callback triggered??\n");
 	return 0;
 }
 
+/* FIXME
+ * not used any longer, they now use e_end_resync_block.
+ * maybe remove again?
+ */
 int w_is_resync_read(drbd_dev *mdev, struct drbd_work *w, int unused)
 {
 	ERR("%s: Typecheck only, should never be called!\n", __FUNCTION__ );
@@ -419,6 +424,7 @@
 {
 	drbd_dev* mdev = (drbd_dev*) data;
 
+	D_ASSERT(list_empty(&mdev->resync_work.list));
 	if(unlikely(test_and_clear_bit(STOP_SYNC_TIMER,&mdev->flags))) {
 		mdev->resync_work.cb = w_resync_inactive;
 	} else {
@@ -594,6 +600,13 @@
 	return ok;
 }
 
+int w_send_write_hint(drbd_dev *mdev, struct drbd_work *w, int cancel)
+{
+	if (cancel) return 1;
+	NOT_IN_26(clear_bit(WRITE_HINT_QUEUED,&mdev->flags));
+	return drbd_send_short_cmd(mdev,WriteHint);
+}
+
 STATIC void drbd_global_lock(void)
 {
 	int i;
@@ -826,9 +839,12 @@
 	if(0) {
 	err:
 		ERR("A work callback returned not ok!\n");
+		// ?? drbd_thread_restart_nowait(&mdev->asender);
 		drbd_thread_restart_nowait(&mdev->receiver);
 	}
 
+	del_timer_sync(&mdev->resync_timer); // just in case...
+
 	while(!down_trylock(&mdev->data.work.s)) {
 		spin_lock_irq(&mdev->req_lock);
 		if (!list_empty(&mdev->data.work.q)) {
@@ -837,11 +853,9 @@
 			list_del_init(&w->list);
 		}
 		spin_unlock_irq(&mdev->req_lock);
-		
 		w->cb(mdev,w,1);
 	}
 
-	del_timer_sync(&mdev->resync_timer); // just in case...
 	INFO("worker terminated\n");
 
 	return 0;
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/drbd_fs.c,v
retrieving revision 1.28.2.79
retrieving revision 1.28.2.80
diff -u -3 -r1.28.2.79 -r1.28.2.80
--- drbd_fs.c	15 Apr 2004 12:01:15 -0000	1.28.2.79
+++ drbd_fs.c	24 Apr 2004 16:35:21 -0000	1.28.2.80
@@ -161,7 +161,7 @@
 			struct ioctl_disk_config * arg)
 {
 	NOT_IN_26(int err;) // unused in 26 ?? cannot believe it ...
-	int i,minor;
+	int i, md_gc_valid, minor;
 	enum ret_codes retcode;
 	struct disk_config new_conf;
 	struct file *filp = 0;
@@ -321,14 +321,26 @@
 
 	clear_bit(SENT_DISK_FAILURE,&mdev->flags);
 	set_bit(MD_IO_ALLOWED,&mdev->flags);
-	i = drbd_md_read(mdev);
+
+/* FIXME I think inc_local_md_only within drbd_md_read is misplaced.
+ * should go here, and the corresponding dec_local, too.
+ */
+
+	md_gc_valid = drbd_md_read(mdev);
+
+/* FIXME if (md_gc_valid < 0) META DATA IO NOT POSSIBLE! */
+
 	drbd_determin_dev_size(mdev);
-	if(i) drbd_read_bm(mdev);
+
+	if(md_gc_valid) drbd_read_bm(mdev);
 	else {
 		INFO("Assuming that all blocks are out of sync (aka FullSync)\n");
 		bm_fill_bm(mdev->mbds_id,-1);
 		mdev->rs_total = drbd_get_capacity(mdev->this_bdev);
 		drbd_write_bm(mdev);
+
+/* FIXME whipeout on disk activity log area */
+
 	}
 
 	if ( !mdev->act_log ||
@@ -761,7 +773,9 @@
 		drbd_thread_stop(&mdev->asender);
 		drbd_thread_stop(&mdev->receiver);
 
-		set_cstate(mdev,StandAlone);
+		if (test_bit(DISKLESS,&mdev->flags)) set_cstate(mdev,Unconfigured);
+		else set_cstate(mdev,StandAlone);
+
 		break;
 
 	case DRBD_IOCTL_UNCONFIG_DISK:
@@ -798,8 +812,9 @@
 		}
 
 		drbd_free_ll_dev(mdev);
-		mdev->la_size=0;
 
+/* FIXME race with sync start
+ */
 		if (mdev->cstate == Connected) drbd_send_param(mdev,0);
 		if (mdev->cstate == StandAlone) set_cstate(mdev,Unconfigured);
 
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/drbd_int.h,v
retrieving revision 1.58.2.145
retrieving revision 1.58.2.146
diff -u -3 -r1.58.2.145 -r1.58.2.146
--- drbd_int.h	23 Apr 2004 08:56:49 -0000	1.58.2.145
+++ drbd_int.h	24 Apr 2004 16:35:21 -0000	1.58.2.146
@@ -169,10 +169,10 @@
 #else
 	// at most one DBG(x) per t seconds
 #define C_DBG(t,x...) do { \
-	static unsigned long _j; \
+	static unsigned long _j = 0; \
 	if ((long)(jiffies-_j)< HZ*t) break; \
 	_j=jiffies; \
-	DBG(x); \
+	INFO(x); \
 } while (0)
 #endif
 
@@ -360,11 +360,7 @@
  *   PingAck
  *   BecomeSyncTarget
  *   BecomeSyncSource
- *   BecomeSec
  *   WriteHint
- *   SyncStop
- *   SyncCont
- *   SyncDone
  */
 
 /*
@@ -545,12 +541,12 @@
 // bitfield? enum?
 /* flag bits */
 #define ISSUE_BARRIER      0
-#define ISSUE_IO_HINT      1
+// #define ISSUE_IO_HINT      1		is now drbd_queue_work'ed
 #define SEND_PING          2
 #define WRITER_PRESENT     3
 #define STOP_SYNC_TIMER    4
 #define DO_NOT_INC_CONCNT  5
-#define WRITE_HINT_QUEUED  6
+#define WRITE_HINT_QUEUED  6		/* only relevant in 2.4 */
 #define DISKLESS           7
 #define PARTNER_DISKLESS   8
 #define PROCESS_EE_RUNNING 9
@@ -624,7 +620,10 @@
 	struct drbd_socket data; // for data/barrier/cstate/parameter packets
 	struct drbd_socket meta; // for ping/ack (metadata) packets
 	volatile unsigned long last_received; // in jiffies, either socket
-	struct drbd_work  resync_work,barrier_work;
+	volatile unsigned int ko_count;
+	struct drbd_work  resync_work,
+			  barrier_work,
+			  unplug_work;
 	struct timer_list resync_timer;
 #if LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)
 	kdev_t backing_bdev;  // backing device
@@ -687,8 +686,7 @@
 	struct list_head read_ee;   // IO in progress
 	// struct list_head rdone_ee;  // send result or CondRequest
 	spinlock_t pr_lock;
-	// struct list_head app_reads; // FIXME broken on purpose by lge
-	struct list_head new_app_reads;
+	struct list_head app_reads;
 	struct list_head resync_reads;
 	int ee_vacant;
 	int ee_in_use;
@@ -744,6 +742,7 @@
 extern int drbd_send_bitmap(drbd_dev *mdev);
 extern void drbd_free_ll_dev(drbd_dev *mdev);
 extern int drbd_io_error(drbd_dev* mdev);
+extern void drbd_mdev_cleanup(drbd_dev *mdev);
 
 
 
@@ -848,6 +847,7 @@
 extern int w_resume_next_sg      (drbd_dev *, struct drbd_work *, int);
 extern int w_io_error            (drbd_dev *, struct drbd_work *, int);
 extern int w_try_send_barrier    (drbd_dev *, struct drbd_work *, int);
+extern int w_send_write_hint     (drbd_dev *, struct drbd_work *, int);
 
 // drbd_receiver.c
 extern int drbd_release_ee(drbd_dev* mdev,struct list_head* list);
@@ -949,6 +949,8 @@
 	spin_lock_irqsave(&mdev->req_lock,flags);
 	_set_cstate(mdev,ns);
 	spin_unlock_irqrestore(&mdev->req_lock,flags);
+	if (ns == Unconfigured)
+		drbd_mdev_cleanup(mdev);
 }
 
 /**
@@ -1006,6 +1008,13 @@
 }
 
 static inline void
+_drbd_queue_work_front(struct drbd_work_queue *q, struct drbd_work *w)
+{
+	list_add(&w->list,&q->q);
+	up(&q->s);
+}
+
+static inline void
 drbd_queue_work(drbd_dev *mdev, struct drbd_work_queue *q,
 		  struct drbd_work *w)
 {
@@ -1152,6 +1161,67 @@
 	mdev->rs_total +=
 		bm_set_bit(mdev, sector, blk_size, SS_OUT_OF_SYNC);
 }
+
+#if 0
+/*
+ * enable to dump information about every packet exchange.
+ */
+static inline void
+dump_packet(drbd_dev *mdev, struct socket *sock,
+	    int recv, Drbd_Polymorph_Packet *p)
+{
+	char *sockname = sock == mdev->meta.socket ? "meta" : "data";
+	int cmd = be16_to_cpu(p->head.command);
+	switch (cmd) {
+	case Ping:
+	case PingAck:
+	case BecomeSyncTarget:
+	case BecomeSyncSource:
+	case WriteHint:
+
+	case SyncParam:
+	case ReportParams:
+		INFO(" %s [%d] %s %s %s\n", current->comm, current->pid,
+		     sockname, recv?"<<<":">>>", cmdname(cmd));
+		break;
+
+	case Data:
+	case DataReply:
+	case RSDataReply:
+
+	case RecvAck:   /* yes I know. but it is the same layout */
+	case WriteAck:
+	case NegAck:
+
+	case DataRequest:
+	case RSDataRequest:
+		INFO(" %s [%d] %s %s %s (%lu,%lx)\n", current->comm, current->pid,
+		     sockname, recv?"<<<":">>>", cmdname(cmd),
+		     (long)be64_to_cpu(p->Data.sector), (long)p->Data.block_id
+		);
+		break;
+
+	case Barrier:
+	case BarrierAck:
+		INFO(" %s [%d] %s %s %s (%u)\n", current->comm, current->pid,
+		     sockname, recv?"<<<":">>>", cmdname(cmd),
+		     p->Barrier.barrier
+		);
+		break;
+
+	default:
+		INFO(" %s [%d] %s %s %s (%u)\n", current->comm, current->pid,
+		     sockname, recv?"<<<":">>>", cmdname(cmd), cmd
+		);
+		break;
+	}
+}
+#else
+static inline void
+dump_packet(drbd_dev *mdev, struct socket *sock,
+	    int recv, Drbd_Polymorph_Packet *p)   { /* DO NOTHING */ }
+#endif
+
 
 #if LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)
 # define sector_div(n, b)( \
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/drbd_main.c,v
retrieving revision 1.73.2.153
retrieving revision 1.73.2.154
diff -u -3 -r1.73.2.153 -r1.73.2.154
--- drbd_main.c	24 Apr 2004 02:03:22 -0000	1.73.2.153
+++ drbd_main.c	24 Apr 2004 16:35:21 -0000	1.73.2.154
@@ -392,6 +392,7 @@
 	wake_up_interruptible(&mdev->cstate_wait);
 
 	if ( ( os==SyncSource || os==SyncTarget ) && ns <= Connected ) {
+		set_bit(STOP_SYNC_TIMER,&mdev->flags);
 		mdev->resync_work.cb = w_resume_next_sg;
 		_drbd_queue_work(&mdev->data.work,&mdev->resync_work);
 	}
@@ -526,9 +527,7 @@
 		ERR("short sent %s size=%d sent=%d\n",
 		    cmdname(cmd), (int)size, sent);
 	}
-	C_DBG(5,"on %s >>> %s l: %d\n",
-	    sock == mdev->meta.socket ? "msock" : "sock",
-	    cmdname(cmd), size-sizeof(Drbd_Header));
+	dump_packet(mdev,sock,0,(void*)h);
 	return ok;
 }
 
@@ -560,28 +559,6 @@
 	return ok;
 }
 
-/* for WriteHint, maybe others.
- * returns
- *   1 if nonblocking send was succesfull,
- *   0 if nonblocking send failed,
- * -EAGAIN if we did not get the send mutex
- */
-STATIC int drbd_send_cmd_dontwait(drbd_dev *mdev, struct socket *sock,
-		  Drbd_Packet_Cmd cmd, Drbd_Header* h, size_t size)
-{
-	int ok;
-	sigset_t old_blocked;
-
-	struct semaphore *mutex = sock == mdev->meta.socket ?
-		&mdev->meta.mutex : &mdev->data.mutex;
-	if (down_trylock(mutex)) return -EAGAIN;
-	old_blocked = block_sigs_but(DRBD_SHUTDOWNSIGMASK);
-	ok = _drbd_send_cmd(mdev,sock,cmd,h,size, MSG_DONTWAIT);
-	restore_old_sigset(old_blocked);
-	up  (mutex);
-	return ok;
-}
-
 int drbd_send_sync_param(drbd_dev *mdev, struct syncer_config *sc)
 {
 	Drbd_SyncParam_Packet p;
@@ -645,7 +622,7 @@
 
 	ERR_IF(!mdev->mbds_id) return FALSE;
 
-	bm_words = mdev->mbds_id->size/sizeof(unsigned long);
+	bm_words = mdev->mbds_id->size/sizeof(long);
 	bm = mdev->mbds_id->bm;
 	p  = vmalloc(PAGE_SIZE); // sleeps. cannot fail.
 	buffer = (unsigned long*)p->payload;
@@ -656,7 +633,7 @@
 	 */
 	do {
 		want=min_t(int,MBDS_PACKET_SIZE,(bm_words-bm_i)*sizeof(long));
-		for(buf_i=0;buf_i<want/sizeof(unsigned long);buf_i++)
+		for(buf_i=0;buf_i<want/sizeof(long);buf_i++)
 			buffer[buf_i] = cpu_to_lel(bm[bm_i++]);
 		ok = drbd_send_cmd(mdev,mdev->data.socket,ReportBitMap,
 				   p, sizeof(*p) + want);
@@ -728,26 +705,28 @@
 }
 
 /* called on sndtimeo
- * returns TRUE if we should retry,
- * FALSE if we think connection is dead,
- * or someone signaled us.
+ * returns FALSE if we should retry,
+ * TRUE if we think connection is dead
  */
-STATIC int drbd_retry_send(drbd_dev *mdev, struct socket *sock)
+STATIC int we_should_drop_the_connection(drbd_dev *mdev, struct socket *sock)
 {
-	long elapsed = (long)(jiffies - mdev->last_received);
-	DUMPLU(elapsed);
-	if ( signal_pending(current) || mdev->cstate <= WFConnection )
-		return FALSE;
-	if ( elapsed < mdev->conf.timeout*HZ/20 )
+	int drop_it;
+	// long elapsed = (long)(jiffies - mdev->last_received);
+	// DUMPLU(elapsed); // elapsed ignored for now.
+
+	if (mdev->meta.socket == sock || !mdev->asender.task)
 		return TRUE;
-	if ( current != mdev->asender.task ) {
-		// FIXME ko_count--
-		DBG("sock_sendmsg timed out, requesting ping\n");
+
+	drop_it = !--mdev->ko_count;
+	if ( !drop_it ) {
+		printk(KERN_ERR DEVICE_NAME
+		       "%d: [%s/%d] sock_sendmsg time expired, ko = %u\n",
+		       (int)(mdev-drbd_conf), current->comm, current->pid,
+		       mdev->ko_count);
 		request_ping(mdev);
-		return TRUE;
 	}
-	ERR("sock_sendmsg timed out, aborting connection\n");
-	return FALSE;
+
+	return drop_it; /* && (mdev->state == Primary) */;
 }
 
 int _drbd_send_page(drbd_dev *mdev, struct page *page,
@@ -755,7 +734,6 @@
 {
 	int sent,ok;
 	int len   = size;
-	int retry = 10;
 
 	spin_lock(&mdev->send_task_lock);
 	mdev->send_task=current;
@@ -764,11 +742,10 @@
 	do {
 		sent = mdev->data.socket->ops->sendpage(mdev->data.socket, page, offset, len, MSG_NOSIGNAL);
 		if (sent == -EAGAIN) {
-			// FIXME move "retry--" into drbd_retry_send()
-			if (drbd_retry_send(mdev,mdev->data.socket) && retry--)
-				continue;
-			else
+			if (we_should_drop_the_connection(mdev,mdev->data.socket))
 				break;
+			else
+				continue;
 		}
 		if (sent <= 0) {
 			WARN("%s: size=%d len=%d sent=%d\n",
@@ -796,8 +773,6 @@
 	int ok;
 	sigset_t old_blocked;
 	Drbd_Data_Packet p;
-	Drbd_Header ioh;
-
 
 	ERR_IF(!req || !req->master_bio) return FALSE;
 
@@ -847,11 +822,6 @@
 	ok =  (drbd_send(mdev,mdev->data.socket,&p,sizeof(p),MSG_MORE) == sizeof(p))
 	   && _drbd_send_zc_bio(mdev,&req->private_bio);
 
-	if(test_and_clear_bit(ISSUE_IO_HINT,&mdev->flags)) {
-		_drbd_send_cmd(mdev,mdev->data.socket,WriteHint,&ioh,
-			       sizeof(ioh),0);
-	}
-
 	spin_lock(&mdev->send_task_lock);
 	mdev->send_task=NULL;
 	spin_unlock(&mdev->send_task_lock);
@@ -919,7 +889,6 @@
 	struct msghdr msg;
 	struct iovec iov;
 	int rv,sent=0;
-	int retry = 10;
 
 	if (!sock) return -1000;
 	if (mdev->cstate < WFReportParams) return -1001;
@@ -940,6 +909,8 @@
 	oldfs = get_fs();
 	set_fs(KERNEL_DS);
 
+	if (sock == mdev->data.socket)
+		mdev->ko_count = 10; // FIXME conf.ko_count
 	do {
 		/* STRANGE
 		 * tcp_sendmsg does _not_ use its size parameter at all ?
@@ -952,15 +923,15 @@
  */
 		rv = sock_sendmsg(sock, &msg, iov.iov_len );
 		if (rv == -EAGAIN) {
-			// FIXME move "retry--" into drbd_retry_send()
-			if (drbd_retry_send(mdev,sock) && retry--)
-				continue;
-			else
+			if (we_should_drop_the_connection(mdev,sock))
 				break;
+			else
+				continue;
 		}
 		D_ASSERT(rv != 0);
 		if (rv == -EINTR ) {
-			ERR("Got a signal in drbd_send()!\n");
+			ERR("Got a signal in drbd_send(,%c,)!\n",
+			    sock == mdev->meta.socket ? 'm' : 's');
 			dump_stack();
 			drbd_flush_signals(current);
 			rv = 0;
@@ -1032,10 +1003,9 @@
 }
 
 #if LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)
-STATIC void drbd_send_write_hint(void *data)
+STATIC void drbd_unplug_fn(void *data)
 {
 	struct Drbd_Conf* mdev = (drbd_dev*)data;
-	Drbd_Header h;
 	int i;
 
 	/* In case the receiver calls run_task_queue(&tq_disk) itself,
@@ -1043,12 +1013,6 @@
 	   secondary state), it could happen that it has to send the
 	   WRITE_HINT for an other device (which is in primary state).
 	   This could lead to a distributed deadlock!!
-
-	   To avoid the deadlock we set the ISSUE_IO_HINT bit and
-	   it will be sent after the current data block.
-	UPDATE:
-	   since "dontwait" this would no longer deadlock, but probably
-	   create a useless loop echoing WriteHints back and forth ...
 	 */
 
 	for (i = 0; i < minor_count; i++) {
@@ -1058,11 +1022,10 @@
 		}
 	}
 
-	if (drbd_send_cmd_dontwait(mdev,mdev->data.socket,WriteHint,&h,
-				   sizeof(h)) != 1){
-		set_bit(ISSUE_IO_HINT,&mdev->flags);
-	}
-	clear_bit(WRITE_HINT_QUEUED, &mdev->flags);
+	spin_lock_irq(&mdev->req_lock);
+	if (list_empty(&mdev->unplug_work.list))
+		_drbd_queue_work_front(&mdev->data.work,&mdev->unplug_work);
+	spin_unlock_irq(&mdev->req_lock);
 }
 #else
 
@@ -1070,32 +1033,31 @@
  * as 2.6.X moves on, we can probably drop it again.
  */
 #if LINUX_VERSION_CODE > KERNEL_VERSION(2,6,5)
-STATIC void drbd_send_write_hint(request_queue_t *q)
+STATIC void drbd_unplug_fn(request_queue_t *q)
 {
 #else
-STATIC void drbd_send_write_hint(void *data)
+STATIC void drbd_unplug_fn(void *data)
 {
 	request_queue_t *q = (request_queue_t*)data;
 #endif
 	drbd_dev *mdev = q->queuedata;
-	Drbd_Header h;
-
-	/* In order to avoid deadlocks the receiver should only
-	   use blk_run_queue(). It must not use blk_run_queues() to
-	   avoid deadlocks.
-
-	   In 2.6, we should use the plain drbd_send_cmd again.
-	*/
-
-	if (drbd_send_cmd_dontwait(mdev,mdev->data.socket,WriteHint,&h,
-				   sizeof(h)) != 1) {
-		set_bit(ISSUE_IO_HINT,&mdev->flags);
-	}
 
+	/* unplug FIRST */
 	spin_lock_irq(q->queue_lock);
 	blk_remove_plug(q);
 	spin_unlock_irq(q->queue_lock);
 
+	ERR_IF(mdev->state != Primary)
+		return;
+	/* add to the front of the data.work queue,
+         * unless already queued */
+	spin_lock_irq(&mdev->req_lock);
+	/* FIXME this might be a good addition to drbd_queu_work
+	 * anyways, to detect "double queuing" ... */
+	if (list_empty(&mdev->unplug_work.list))
+		_drbd_queue_work_front(&mdev->data.work,&mdev->unplug_work);
+	spin_unlock_irq(&mdev->req_lock);
+	drbd_kick_lo(mdev);
 }
 #endif
 
@@ -1125,7 +1087,6 @@
 	atomic_set(&mdev->local_cnt,0);
 	atomic_set(&mdev->resync_locked,0);
 
-	init_MUTEX(&mdev->device_mutex);
 	init_MUTEX(&mdev->md_io_mutex);
 	init_MUTEX(&mdev->data.mutex);
 	init_MUTEX(&mdev->meta.mutex);
@@ -1145,14 +1106,16 @@
 	INIT_LIST_HEAD(&mdev->done_ee);
 	INIT_LIST_HEAD(&mdev->read_ee);
 	INIT_LIST_HEAD(&mdev->busy_blocks);
-	INIT_LIST_HEAD(&mdev->new_app_reads);
+	INIT_LIST_HEAD(&mdev->app_reads);
 	INIT_LIST_HEAD(&mdev->resync_reads);
 	INIT_LIST_HEAD(&mdev->data.work.q);
 	INIT_LIST_HEAD(&mdev->meta.work.q);
 	INIT_LIST_HEAD(&mdev->resync_work.list);
 	INIT_LIST_HEAD(&mdev->barrier_work.list);
-	mdev->resync_work.cb = w_resync_inactive;
+	INIT_LIST_HEAD(&mdev->unplug_work.list);
+	mdev->resync_work.cb  = w_resync_inactive;
 	mdev->barrier_work.cb = w_try_send_barrier;
+	mdev->unplug_work.cb  = w_send_write_hint;
 	init_timer(&mdev->resync_timer);
 
 	init_waitqueue_head(&mdev->cstate_wait);
@@ -1164,7 +1127,7 @@
 	drbd_thread_init(mdev, &mdev->asender, drbd_asender);
 
 NOT_IN_26(
-	mdev->write_hint_tq.routine = &drbd_send_write_hint;
+	mdev->write_hint_tq.routine = &drbd_unplug_fn;
 	mdev->write_hint_tq.data    = mdev;
 )
 
@@ -1173,6 +1136,78 @@
 #endif
 }
 
+void drbd_mdev_cleanup(drbd_dev *mdev)
+{
+	/* I'd like to cleanup completely, and memset(,0,) it.
+	 * but I'd have to reinit it.
+	 * FIXME: do the right thing...
+	 */
+
+	/* list of things that may still
+	 * hold data of the previous config
+
+	 * act_log        ** re-initialized in set_disk
+	 * on_io_error
+
+	 * al_tr_cycle    ** re-initialized in ... FIXME??
+	 * al_tr_number
+	 * al_tr_pos
+
+	 * backing_bdev   ** re-initialized in drbd_free_ll_dev
+	 * lo_file
+	 * md_bdev 
+	 * md_file
+	 * md_index
+
+	 * ko_count       ** re-initialized in set_net
+
+	 * last_received  ** currently ignored
+
+	 * mbds_id        ** re-initialized in ... FIXME??
+
+	 * resync         ** re-initialized in ... FIXME??
+
+	*** no re-init necessary (?) ***
+	 * md_io_page
+	 * this_bdev
+
+	 * vdisk             ?
+
+	 * rq_queue       ** FIXME ASSERT ??
+	 * newest_barrier
+	 * oldest_barrier
+	 */
+
+	D_ASSERT(mdev->ee_in_use==0);
+	D_ASSERT(mdev->ee_vacant==32 /*EE_MININUM*/);
+	D_ASSERT(mdev->epoch_size==0);
+#define ZAP(x) memset(&x,0,sizeof(x))
+	ZAP(mdev->conf);
+	ZAP(mdev->sync_conf);
+	ZAP(mdev->data);
+	ZAP(mdev->meta);
+	ZAP(mdev->gen_cnt);
+#undef ZAP
+	mdev->al_writ_cnt  =
+	mdev->bm_writ_cnt  =
+	mdev->read_cnt     =
+	mdev->recv_cnt     =
+	mdev->send_cnt     =
+	mdev->writ_cnt     =
+	mdev->la_size      =
+	mdev->lo_usize     =
+	mdev->p_size       =
+	mdev->rs_start     =
+	mdev->rs_total     =
+	mdev->rs_left      =
+	mdev->rs_mark_left =
+	mdev->rs_mark_time = 0;
+	mdev->send_task    = NULL;
+	drbd_set_my_capacity(mdev,0);
+	drbd_init_set_defaults(mdev);
+}
+
+
 void drbd_destroy_mempools(void)
 {
 	if (drbd_request_mempool)
@@ -1249,7 +1284,7 @@
 			remove_proc_entry("drbd",&proc_root);
 		i=minor_count;
 		while (i--) {
-			drbd_dev        *mdev  = &drbd_conf[i];
+			drbd_dev        *mdev  = drbd_conf+i;
 ONLY_IN_26(
 			struct gendisk  **disk = &mdev->vdisk;
 			request_queue_t **q    = &mdev->rq_queue;
@@ -1439,7 +1474,7 @@
 		blk_queue_make_request(q,drbd_make_request_26);
 		q->queue_lock = &mdev->req_lock; // needed since we use
 		// plugging on a queue, that actually has no requests!
-		q->unplug_fn = drbd_send_write_hint;
+		q->unplug_fn = drbd_unplug_fn;
 	}
 #endif
 
@@ -1469,6 +1504,7 @@
 		if (!mdev->act_log) goto Enomem;
 
 		drbd_init_set_defaults(mdev);
+		init_MUTEX(&mdev->device_mutex);
 		if (!tl_init(mdev)) goto Enomem;
 		if (!drbd_init_ee(mdev)) goto Enomem;
 	}
@@ -1624,14 +1660,14 @@
 	}
 	memset(nbm,0,size);
 
-	spin_lock(&sbm->bm_lock);
+	spin_lock_irq(&sbm->bm_lock);
 	if(obm) {
 		memcpy(nbm,obm,min_t(unsigned long,sbm->size,size));
 	}
 	sbm->dev_size = size_kb;
 	sbm->size = size;
 	sbm->bm = nbm;
-	spin_unlock(&sbm->bm_lock);
+	spin_unlock_irq(&sbm->bm_lock);
 
 	if(obm) vfree(obm);
 
@@ -1688,6 +1724,7 @@
 	unsigned long sbnr,ebnr,bnr;
 	sector_t esector = ( sector + (size>>9) - 1 );
 	int ret=0;
+	unsigned long flags;
 
 	if(sbm == NULL) {
 		printk(KERN_ERR DEVICE_NAME"X: No BitMap !?\n");
@@ -1700,11 +1737,24 @@
 	sbnr = sector >> BM_SS;
 	ebnr = esector >> BM_SS;
 
-	spin_lock(&sbm->bm_lock);
+	/*
+	INFO("bm_set_bit(,%lu,%d,%d) %lu %lu %lu ; %lu %lu\n",
+	     sector,size,bit, esector, sbnr,ebnr, sbm->size, sbm->dev_size);
+	*/
+
+	spin_lock_irqsave(&sbm->bm_lock,flags);
 	bm = sbm->bm;
 
 	if(bit) {
 		for(bnr=sbnr; bnr <= ebnr; bnr++) {
+			ERR_IF((bnr>>3) >= sbm->size) {
+				DUMPLU(sector);
+				DUMPI(size);
+				DUMPLU(bnr);
+				DUMPLU(sbm->size);
+				DUMPLU(sbm->dev_size);
+				break;
+			}
 			if(!test_bit(bnr&BPLM,bm+(bnr>>LN2_BPL))) ret+=BM_NS;
 			__set_bit(bnr & BPLM, bm + (bnr>>LN2_BPL));
 			ret += bm_end_of_dev_case(sbm);
@@ -1722,18 +1772,34 @@
 			// end of the device...
 			if(unlikely(dev_size<<1 == esector+1)) {
 				ebnr++;
-				if(test_bit(ebnr&BPLM,bm+(ebnr>>LN2_BPL))) {
+				ERR_IF((ebnr>>3) >= sbm->size) {
+					DUMPLU(sector);
+					DUMPI(size);
+					DUMPLU(ebnr);
+					DUMPLU(sbm->size);
+					DUMPLU(sbm->dev_size);
+				} else if(test_bit(ebnr&BPLM,bm+(ebnr>>LN2_BPL))) {
 					ret = (esector-sector+1)-BM_NS;
 				}
 			}
 		}
 
 		for(bnr=sbnr; bnr <= ebnr; bnr++) {
+			ERR_IF((bnr>>3) >= sbm->size) {
+				DUMPLU(sector);
+				DUMPI(size);
+				DUMPLU(bnr);
+				DUMPLU(sbnr);
+				DUMPLU(ebnr);
+				DUMPLU(sbm->size);
+				DUMPLU(sbm->dev_size);
+				break;
+			}
 			if(test_bit(bnr&BPLM,bm+(bnr>>LN2_BPL))) ret+=BM_NS;
 			clear_bit(bnr & BPLM, bm + (bnr>>LN2_BPL));
 		}
 	}
-	spin_unlock(&sbm->bm_lock);
+	spin_unlock_irqrestore(&sbm->bm_lock,flags);
 
 	return ret;
 }
@@ -1783,9 +1849,10 @@
 int bm_count_sectors(struct BitMap* sbm, unsigned long enr)
 {
 	unsigned long* bm;
+	unsigned long flags;
 	int i,max,bits=0;
 
-	spin_lock(&sbm->bm_lock);
+	spin_lock_irqsave(&sbm->bm_lock,flags);
 	bm = sbm->bm;
 
 	max = min_t(int, (enr+1)*WORDS, sbm->size/sizeof(long));
@@ -1801,7 +1868,7 @@
 		bits += bm_end_of_dev_case(sbm);
 	}
 
-	spin_unlock(&sbm->bm_lock);
+	spin_unlock_irqrestore(&sbm->bm_lock,flags);
 
 	return bits;
 }
@@ -1811,6 +1878,7 @@
 {
 	unsigned long* bm;
 	unsigned long sbnr,ebnr,bnr;
+	unsigned long flags;
 	sector_t esector = ( sector + (size>>9) - 1 );
 	int ret=0;
 
@@ -1822,7 +1890,7 @@
 	sbnr = sector >> BM_SS;
 	ebnr = esector >> BM_SS;
 
-	spin_lock(&sbm->bm_lock);
+	spin_lock_irqsave(&sbm->bm_lock,flags);
 	bm = sbm->bm;
 
 	for (bnr=sbnr; bnr <= ebnr; bnr++) {
@@ -1832,7 +1900,7 @@
 		}
 	}
 
-	spin_unlock(&sbm->bm_lock);
+	spin_unlock_irqrestore(&sbm->bm_lock,flags);
 
 	return ret;
 }
@@ -1841,6 +1909,7 @@
 {
 	sector_t bnr;
 	unsigned long* bm;
+	unsigned long flags;
 	sector_t dev_size;
 	sector_t ret;
 
@@ -1850,7 +1919,7 @@
 		return MBDS_DONE;
 	}
 
-	spin_lock(&sbm->bm_lock);
+	spin_lock_irqsave(&sbm->bm_lock,flags);
 	bm = sbm->bm;
 	bnr = sbm->gs_bitnr;
 
@@ -1872,7 +1941,7 @@
 		sbm->gs_bitnr = bnr+1;
 	}
 
-	spin_unlock(&sbm->bm_lock);
+	spin_unlock_irqrestore(&sbm->bm_lock,flags);
 
 	return ret;
 }
@@ -1880,8 +1949,9 @@
 int bm_is_rs_done(struct BitMap* sbm)
 {
 	int rv=0;
+	unsigned long flags;
 
-	spin_lock(&sbm->bm_lock);
+	spin_lock_irqsave(&sbm->bm_lock,flags);
 
 	if( (sbm->gs_bitnr<<BM_SS) + ((1<<BM_SS)-1) > sbm->dev_size<<1) {
 		int ns = sbm->dev_size % (1<<(BM_BLOCK_SIZE_B-10));
@@ -1891,18 +1961,17 @@
 		}
 	}
 
-	spin_unlock(&sbm->bm_lock);
+	spin_unlock_irqrestore(&sbm->bm_lock,flags);
 
 	return rv;
 }
 
 void bm_reset(struct BitMap* sbm)
 {
-	spin_lock(&sbm->bm_lock);
-
+	unsigned long flags;
+	spin_lock_irqsave(&sbm->bm_lock,flags);
 	sbm->gs_bitnr=0;
-
-	spin_unlock(&sbm->bm_lock);
+	spin_unlock_irqrestore(&sbm->bm_lock,flags);
 }
 
 
@@ -1910,8 +1979,9 @@
 {
 	unsigned long* bm;
 	unsigned long bnr,o;
+	unsigned long flags;
 
-	spin_lock(&sbm->bm_lock);
+	spin_lock_irqsave(&sbm->bm_lock,flags);
 	bm = sbm->bm;
 
 	memset(bm,value,sbm->size);
@@ -1923,7 +1993,7 @@
 		bm[ o ] &= ( ( 1 << (bnr % BITS_PER_LONG) ) - 1 );
 	}
 
-	spin_unlock(&sbm->bm_lock);
+	spin_unlock_irqrestore(&sbm->bm_lock,flags);
 }
 
 /*********************************/
@@ -1939,6 +2009,11 @@
 	u32 bm_offset;         // offset to the bitmap, from here
 };
 
+/*
+
+FIXME md_io might fail unnoticed
+
+*/
 void drbd_md_write(drbd_dev *mdev)
 {
 	struct meta_data_on_disk * buffer;
@@ -1968,9 +2043,10 @@
 	buffer->bm_offset = __constant_cpu_to_be32(MD_BM_OFFSET);
 
 	kunmap(mdev->md_io_page);
-	
+
 	sector = drbd_md_ss(mdev) + MD_GC_OFFSET;
 
+	/* FIXME what if this fails ?? */
 	drbd_md_sync_page_io(mdev,sector,WRITE);
 	mdev->la_size = drbd_get_capacity(mdev->this_bdev)>>1;
 
@@ -1987,12 +2063,13 @@
 	if(!inc_local_md_only(mdev)) return -1;
 
 	down(&mdev->md_io_mutex);
+	buffer = (struct meta_data_on_disk *)kmap(mdev->md_io_page);
 
 	sector = drbd_md_ss(mdev) + MD_GC_OFFSET;
 
-	ERR_IF( ! drbd_md_sync_page_io(mdev,sector,READ) ) goto err;
+/* FIXME different failure cases: IO error or invalid magic */
 
-	buffer = (struct meta_data_on_disk *)kmap(mdev->md_io_page);
+	ERR_IF( ! drbd_md_sync_page_io(mdev,sector,READ) ) goto err;
 
 	if(be32_to_cpu(buffer->magic) != DRBD_MD_MAGIC) goto err;
 
@@ -2004,7 +2081,7 @@
 	kunmap(mdev->md_io_page);
 	up(&mdev->md_io_mutex);
 	dec_local(mdev);
-	
+
 	return 1;
 
  err:
@@ -2017,7 +2094,9 @@
 	for(i=HumanCnt;i<=ArbitraryCnt;i++) mdev->gen_cnt[i]=1;
 	mdev->gen_cnt[Flags]=MDF_Consistent;
 
+/* FIXME might have IO errors! */
 	drbd_md_write(mdev);
+
 	return 0;
 }
 
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/drbd_receiver.c,v
retrieving revision 1.97.2.133
retrieving revision 1.97.2.134
diff -u -3 -r1.97.2.133 -r1.97.2.134
--- drbd_receiver.c	21 Apr 2004 21:36:17 -0000	1.97.2.133
+++ drbd_receiver.c	24 Apr 2004 16:35:21 -0000	1.97.2.134
@@ -214,7 +214,7 @@
 {
 	while(mdev->ee_vacant < EE_MININUM ) {
 		if(!drbd_alloc_ee(mdev,GFP_USER)) {
-			ERR("Failed to allocate %d EEs !",EE_MININUM);
+			ERR("Failed to allocate %d EEs !\n",EE_MININUM);
 			return 0;
 		}
 	}
@@ -330,7 +330,7 @@
 
 	if( test_and_set_bit(PROCESS_EE_RUNNING,&mdev->flags) ) {
 		spin_unlock_irq(&mdev->ee_lock);
-		got_sig = wait_event_interruptible(mdev->ee_wait, 
+		got_sig = wait_event_interruptible(mdev->ee_wait,
 		       test_and_set_bit(PROCESS_EE_RUNNING,&mdev->flags) == 0);
 		spin_lock_irq(&mdev->ee_lock);
 		if(got_sig) return 2;
@@ -648,6 +648,18 @@
 
 	set_cstate(mdev,WFReportParams);
 
+	/* in case one of the other threads said: restart_nowait(receiver),
+	 * it may still hang around itself.  make sure threads are
+	 * really stopped before trying to restart them.
+	 * drbd_disconnect should have taken care of that, but I still
+	 * get these "resync inactive, but callback triggered".
+	 *
+	 * and I saw "connection lost... established", and no more
+	 * worker thread :(
+	 */
+	D_ASSERT(mdev->asender.task == NULL);
+	D_ASSERT(mdev->worker.task == NULL);
+
 	drbd_thread_start(&mdev->asender);
 	drbd_thread_start(&mdev->worker);
 
@@ -666,6 +678,7 @@
 		ERR("short read expecting header on sock: r=%d\n",r);
 		return FALSE;
 	};
+	dump_packet(mdev,mdev->data.socket,1,(void*)h);
 	h->command = be16_to_cpu(h->command);
 	h->length  = be16_to_cpu(h->length);
 	if (unlikely( h->magic != BE_DRBD_MAGIC )) {
@@ -908,7 +921,7 @@
 	if(mdev->conf.wire_protocol == DRBD_PROT_C) {
 		if(likely(drbd_bio_uptodate(&e->private_bio))) {
 			ok=drbd_send_ack(mdev,WriteAck,e);
-			if(ok && mdev->rs_left) 
+			if(ok && mdev->rs_left)
 				drbd_set_in_sync(mdev,sector,drbd_ee_get_size(e));
 		} else {
 			ok = drbd_send_ack(mdev,NegAck,e);
@@ -1241,7 +1254,7 @@
 	int ok=FALSE, bm_i=0;
 	unsigned long bits=0;
 
-	bm_words=mdev->mbds_id->size/sizeof(unsigned long);
+	bm_words=mdev->mbds_id->size/sizeof(long);
 	bm=mdev->mbds_id->bm;
 	buffer=vmalloc(MBDS_PACKET_SIZE);
 
@@ -1251,7 +1264,7 @@
 		if (want==0) break;
 		if (drbd_recv(mdev, buffer, want) != want)
 			goto out;
-		for(buf_i=0;buf_i<want/sizeof(unsigned long);buf_i++) {
+		for(buf_i=0;buf_i<want/sizeof(long);buf_i++) {
 			word = lel_to_cpu(buffer[buf_i]) | bm[bm_i];
 			bits += hweight_long(word);
 			bm[bm_i++] = word;
@@ -1295,9 +1308,10 @@
 	 * Application READ requests
 	 */
 	spin_lock(&mdev->pr_lock);
-	list_add(&workset,&mdev->new_app_reads);
-	list_del(&mdev->new_app_reads);
-	INIT_LIST_HEAD(&mdev->new_app_reads);
+	// FIXME use list_splice_init
+	list_add(&workset,&mdev->app_reads);
+	list_del(&mdev->app_reads);
+	INIT_LIST_HEAD(&mdev->app_reads);
 	spin_unlock(&mdev->pr_lock);
 
 	while(!list_empty(&workset)) {
@@ -1730,9 +1744,14 @@
 				goto err;
 			}
 			expect = asender_tbl[cmd].pkt_size;
+			ERR_IF(len != expect-sizeof(Drbd_Header)) {
+				dump_packet(mdev,mdev->meta.socket,1,(void*)h);
+				DUMPI(expect);
+			}
 		}
 		if(received == expect) {
 			D_ASSERT(cmd != -1);
+			dump_packet(mdev,mdev->meta.socket,1,(void*)h);
 			if(!asender_tbl[cmd].process(mdev,h)) goto err;
 
 			buf      = h;
===================================================================
RCS file: /var/lib/cvs/drbd/drbd/drbd/drbd_req-2.4.c,v
retrieving revision 1.33.2.65
retrieving revision 1.33.2.66
diff -u -3 -r1.33.2.65 -r1.33.2.66
--- drbd_req-2.4.c	15 Apr 2004 12:29:02 -0000	1.33.2.65
+++ drbd_req-2.4.c	24 Apr 2004 16:35:22 -0000	1.33.2.66
@@ -120,7 +120,7 @@
 
 	req->w.cb = w_is_app_read;
 	spin_lock(&mdev->pr_lock);
-	list_add(&req->w.list,&mdev->new_app_reads);
+	list_add(&req->w.list,&mdev->app_reads);
 	spin_unlock(&mdev->pr_lock);
 	inc_ap_pending(mdev);
 #if LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)
@@ -198,6 +198,13 @@
 				 * it, then continue locally.
 				 * Or just issue the request remotely.
 				 */
+/* FIXME I think we have a RACE here
+ * we request it remotely, then later some write starts ...
+ * and finished *before* the answer to the read comes in,
+ * because the ACK for the WRITE goes over meta-socket ...
+ * I think we need to properly lock reads against the syncer, too.
+ */
+
 				local = 0;
 				dec_local(mdev);
 			}
@@ -216,12 +223,6 @@
 		return 0;
 	}
 
-	/* THINK
-	 * maybe we need to
-	 *   if (rw == WRITE) drbd_al_begin_io(mdev, sector);
-	 * right here already?
-	 */
-
 	/* do this first, so I do not need to call drbd_end_req,
 	 * but can set the rq_status directly.
 	 */
@@ -230,6 +231,20 @@
 	if (!remote)
 		req->rq_status |= RQ_DRBD_SENT;
 
+	/* THINK
+	 * maybe we need to
+	 *   if (rw == WRITE) drbd_al_begin_io(mdev, sector);
+	 * right here already?
+	 */
+
+	if (rw == WRITE && local)
+		drbd_al_begin_io(mdev, sector);
+
+	/* since we possibly waited, we have a race: mdev may have
+	 * changed underneath us. Thats why I want to have a read lock
+	 * on it, and every state change of mdev needs to be done with a
+	 * write lock on it! */
+
 	if (remote) {
 		/* either WRITE and Connected,
 		 * or READ, and no local disk,
@@ -258,7 +273,6 @@
 
 	if (local) {
 		if (rw == WRITE) {
-			drbd_al_begin_io(mdev, sector);
 			if (!remote) drbd_set_out_of_sync(mdev,sector,size);
 		} else {
 			D_ASSERT(!remote);



[prev in list] [next in list] [prev in thread] [next in thread] 

Configure | About | News | Add a list | Sponsored by KoreLogic