[RFC] vmbus: lockless ring write

Stephen Hemminger stephen at networkplumber.org
Sat Jan 28 00:01:37 UTC 2017


On Fri, 27 Jan 2017 18:08:35 +0000
KY Srinivasan <kys at microsoft.com> wrote:

> > -----Original Message-----
> > From: Stephen Hemminger [mailto:stephen at networkplumber.org]
> > Sent: Monday, January 23, 2017 5:40 PM
> > To: KY Srinivasan <kys at microsoft.com>; Haiyang Zhang
> > <haiyangz at microsoft.com>
> > Cc: devel at linuxdriverproject.org; Stephen Hemminger
> > <sthemmin at microsoft.com>
> > Subject: [PATCH 07/14] vmbus: remove conditional locking of vmbus_write
> > 
> > All current usage of vmbus write uses the acquire_lock flag, therefore
> > having it be optional is unnecessary. This also fixes a sparse warning
> > since sparse doesn't like when a function has conditional locking.  
> 
> In order to avoid cross-tree dependency, I got this functionality into Greg's
> tree first. I plan to submit patches to netvsc that will use this functionality.
> As you know, we hold a higher level lock in the protocol stack when we send on
> a sub-channel. This will avoid an unnecessary spin lock round trip in the data path.
> 
> Regards,
> 
> K. Y
>

The following (untested) changes VMBUS ring buffer to use a lockless update policy.

From b4a0ff829fe617aacb5bcea44eb473692f1180d2 Mon Sep 17 00:00:00 2001
From: Stephen Hemminger <sthemmin at microsoft.com>
Date: Fri, 27 Jan 2017 15:46:48 -0800
Subject: [PATCH] vmbus: implement lock-less ring buffer

Use a reservation similar to ftrace to make vmbus ring buffer writes
lock free. Other similar implementions are FreeBSD buf_ring, and DPDK
rte_ring.

The algorithm uses cmpxchg to atomically reserve an area in the ring
buffer. Then the data is copied into the ring, and the updates to the
head of the ring are ordered.

Signed-off-by: Stephen Hemminger <sthemmin at microsoft.com>
---
 drivers/hv/hyperv_vmbus.h |   2 +-
 drivers/hv/ring_buffer.c  | 116 ++++++++++++++++++++++++++++------------------
 include/linux/hyperv.h    |   1 -
 3 files changed, 72 insertions(+), 47 deletions(-)

diff --git a/drivers/hv/hyperv_vmbus.h b/drivers/hv/hyperv_vmbus.h
index 08a294c968ee..884f83bba1ab 100644
--- a/drivers/hv/hyperv_vmbus.h
+++ b/drivers/hv/hyperv_vmbus.h
@@ -283,7 +283,7 @@ int hv_ringbuffer_init(struct hv_ring_buffer_info *ring_info,
 void hv_ringbuffer_cleanup(struct hv_ring_buffer_info *ring_info);
 
 int hv_ringbuffer_write(struct vmbus_channel *channel,
-			struct kvec *kv_list, u32 kv_count);
+			const struct kvec *kv_list, u32 kv_count);
 
 int hv_ringbuffer_read(struct vmbus_channel *channel,
 		       void *buffer, u32 buflen, u32 *buffer_actual_len,
diff --git a/drivers/hv/ring_buffer.c b/drivers/hv/ring_buffer.c
index 75029f4a2c14..ffaf631b0e41 100644
--- a/drivers/hv/ring_buffer.c
+++ b/drivers/hv/ring_buffer.c
@@ -213,9 +213,6 @@ int hv_ringbuffer_init(struct hv_ring_buffer_info *ring_info,
 	if (!ring_info->ring_buffer)
 		return -ENOMEM;
 
-	ring_info->ring_buffer->read_index =
-		ring_info->ring_buffer->write_index = 0;
-
 	/* Set the feature bit for enabling flow control. */
 	ring_info->ring_buffer->feature_bits.value = 1;
 
@@ -223,8 +220,6 @@ int hv_ringbuffer_init(struct hv_ring_buffer_info *ring_info,
 	ring_info->ring_datasize = ring_info->ring_size -
 		sizeof(struct hv_ring_buffer);
 
-	spin_lock_init(&ring_info->ring_lock);
-
 	return 0;
 }
 
@@ -234,70 +229,101 @@ void hv_ringbuffer_cleanup(struct hv_ring_buffer_info *ring_info)
 	vunmap(ring_info->ring_buffer);
 }
 
-/* Write to the ring buffer. */
+/*
+ * Multiple produce lock-free ring buffer enqueue
+ *
+ * If two CPU's are updating ring at same time then each will
+ * have a different set offsets.
+ *
+ *    ------------------------------
+ *       | CPU 1     | CPU 2   |
+ *       |  data     |  data   |
+ *    ------------------------------
+ *       ^           ^         ^
+ *       |           |         |
+ *     old_wr(1)   next_wr(1)  |
+ *                 old_wr(2)   next_wr(2)
+ *
+ * ring_buffer_info -> priv_write_index is offset where CPU should
+ * store data in ring.
+ *
+ * ring_buffer -> write_index is upper limit of where host can see
+ * valid data in ring.
+ *
+ */
 int hv_ringbuffer_write(struct vmbus_channel *channel,
-			struct kvec *kv_list, u32 kv_count)
+			const struct kvec *kv_list, unsigned int kv_count)
 {
-	int i;
-	u32 bytes_avail_towrite;
-	u32 totalbytes_towrite = sizeof(u64);
-	u32 next_write_location;
-	u32 old_write;
-	u64 prev_indices;
-	unsigned long flags;
 	struct hv_ring_buffer_info *outring_info = &channel->outbound;
+	const u32 ring_size = outring_info->ring_datasize;
+	u32 write_location, next_write_location, write_offset;
+	u32 totalbytes_towrite;
+	u64 prev_indices;
+	unsigned int i;
 
-	if (channel->rescind)
+	if (unlikely(channel->rescind))
 		return -ENODEV;
 
+	totalbytes_towrite = sizeof(u64);
 	for (i = 0; i < kv_count; i++)
 		totalbytes_towrite += kv_list[i].iov_len;
 
-	spin_lock_irqsave(&outring_info->ring_lock, flags);
+	/* Getting preempted here would hurt performance */
+	preempt_disable();
 
-	bytes_avail_towrite = hv_get_bytes_to_write(outring_info);
+	/* Atomic update update the next write_index */
+	do {
+		u32 read_location;
+		u32 bytes_avail;
 
-	/*
-	 * If there is only room for the packet, assume it is full.
-	 * Otherwise, the next time around, we think the ring buffer
-	 * is empty since the read index == write index.
-	 */
-	if (bytes_avail_towrite <= totalbytes_towrite) {
-		spin_unlock_irqrestore(&outring_info->ring_lock, flags);
-		return -EAGAIN;
-	}
+		write_location = READ_ONCE(outring_info->priv_write_index);
+		prev_indices = (u64)write_location << 32;
+
+		read_location = READ_ONCE(outring_info->ring_buffer->read_index);
+		bytes_avail = (write_location >= read_location)
+			? ring_size - write_location - read_location
+			: read_location - write_location;
 
-	/* Write to the ring buffer */
-	next_write_location = outring_info->ring_buffer->write_index;
+		if (totalbytes_towrite > bytes_avail) {
+			preempt_enable();
+			return -EAGAIN;
+		}
 
-	old_write = outring_info->ring_buffer->write_index;
+		next_write_location = write_location + totalbytes_towrite;
+		if (next_write_location > ring_size)
+			next_write_location -= ring_size;
 
+	} while (cmpxchg(&outring_info->priv_write_index,
+			 write_location, next_write_location) == write_location);
+
+	/* Copy new data into place */
+	write_offset = write_location;
 	for (i = 0; i < kv_count; i++) {
-		next_write_location = hv_copyto_ringbuffer(outring_info,
-						     next_write_location,
-						     kv_list[i].iov_base,
-						     kv_list[i].iov_len);
+		write_offset = hv_copyto_ringbuffer(outring_info, write_offset,
+						    kv_list[i].iov_base, kv_list[i].iov_len);
 	}
 
 	/* Set previous packet start */
-	prev_indices = (u64)outring_info->ring_buffer->write_index << 32;
-	next_write_location = hv_copyto_ringbuffer(outring_info,
-					     next_write_location,
-					     &prev_indices,
-					     sizeof(u64));
+	write_offset = hv_copyto_ringbuffer(outring_info, write_location,
+					     &prev_indices, sizeof(u64));
 
 	/* Issue a full memory barrier before updating the write index */
 	virt_mb();
 
-	/* Now, update the write location */
-	outring_info->ring_buffer->write_index = next_write_location;
-
-	spin_unlock_irqrestore(&outring_info->ring_lock, flags);
+	/*
+	 * Update the write_index atomicly.
+	 *  If this write raced with another guest CPU, then wait our turn
+	 */
+	while (cmpxchg(&outring_info->ring_buffer->write_index,
+		       write_location, next_write_location) != write_location) {
+		yield();
 
-	hv_signal_on_write(old_write, channel);
+		if (channel->rescind)
+			return -ENODEV;
+	}
 
-	if (channel->rescind)
-		return -ENODEV;
+	hv_signal_on_write(write_location, channel);
+	preempt_enable();
 
 	return 0;
 }
diff --git a/include/linux/hyperv.h b/include/linux/hyperv.h
index 79d18f4dea24..b18924d9dd48 100644
--- a/include/linux/hyperv.h
+++ b/include/linux/hyperv.h
@@ -121,7 +121,6 @@ struct hv_ring_buffer {
 struct hv_ring_buffer_info {
 	struct hv_ring_buffer *ring_buffer;
 	u32 ring_size;			/* Include the shared header */
-	spinlock_t ring_lock;
 
 	u32 ring_datasize;		/* < ring_size */
 	u32 ring_data_startoffset;
-- 
2.11.0



More information about the devel mailing list