I have finished adding some preliminary support for "blocking regions" to taco. The idea here is to tag regions of a task that are performing some high latency, low cpu overhead work - like say blocking network i/o. With these regions appropriately tagged, we can then migrate the fiber they are executing on to a dedicated i/o thread, freeing the current thread to continue processing tasks. Once the blocking region is exited, the worker fiber can migrate back to the task execution threads.

Internally, taco maintans a list of inactive "blocker" threads. When a task wants to enter one of these regions it will make a call to taco::BeginBlocking. This will yield execution on the current task thread to the next appropriate worker fiber, while posting the current worker fiber to one of these inactive blocker threads and signalling it awake. When the task is finished with whatever blocking work it is doing it just calls taco::EndBlocking. This will put the blocker thread back to sleep and in the inactive list, scheduling the fiber back on one of the task threads.

Simple enough in concept, but how well does it work? I went ahead and coded up a simple artificial test that simply enters one of these blocking regions and sleeps for some amount of time before doing some CPU work:

void test_blocking(bool blocking, unsigned ntasks, unsigned sleeptime, unsigned cputime)
{
	taco::future<uint32_t> * tasks = new taco::future<uint32_t>[ntasks];
	for (unsigned i=0; i<ntasks; i++)
	{
		tasks[i] = taco::Start([=]() -> uint32_t {
			if (blocking) { taco::BeginBlocking(); }

			std::this_thread::sleep_for(std::chrono::microseconds(sleeptime));

			if (blocking) { taco::EndBlocking(); }

			unsigned steps = 0;
			for (unsigned j=0; j<cputime; j++)
			{
			    // collatz
				unsigned n = 837799;
				while (n != 1)
				{
					n = (n & 1) ? ((3 * n) + 1) : (n / 2);
					steps++;
				}
			}
			return steps;
		});
	}
	
	for (unsigned i=1; i<ntasks; i++)
	{
		BASIS_TEST_VERIFY_MSG(tasks[i] == tasks[i - 1], "Mismatch between %u and %u (%u vs %u)", 
			i, i - 1, (uint32_t)tasks[i], (uint32_t)tasks[i - 1]);
	}

	delete [] tasks;
}

Okay, so it is a completely silly test, but hopefully it should approximate some vision of what this blocking facility could be used for. The test setup measures the runtime for this function over a variety of task counts, of time spent sleeping and time spent on cpu work; it does this both with the blocking region enabled and with it disabled and reports the relative speedup.

Task Count
CPU
Sleep
Enabled (ms)
Disabled (ms)
Relative
64
64
1024
28
240
8.57
64
64
2048
34
246
7.24
64
64
4096
33
246
7.45
64
64
8192
30
251
8.37
64
64
16384
48
497
10.35
64
128
1024
44
250
5.68
64
128
2048
33
247
7.48
64
128
4096
38
242
6.37
64
128
8192
42
238
5.67
64
128
16384
52
496
9.54
64
256
1024
68
241
3.54
64
256
2048
44
251
5.70
64
256
4096
58
237
4.09
64
256
8192
45
252
5.60
64
256
16384
89
501
5.63
64
512
1024
83
246
2.96
64
512
2048
124
250
2.02
64
512
4096
79
251
3.18
64
512
8192
121
252
2.08
64
512
16384
145
494
3.41
128
64
1024
28
495
17.68
128
64
2048
48
497
10.35
128
64
4096
48
497
10.35
128
64
8192
48
497
10.35
128
64
16384
64
997
15.58
128
128
1024
69
492
7.13
128
128
2048
75
500
6.67
128
128
4096
71
491
6.92
128
128
8192
75
501
6.68
128
128
16384
82
996
12.15
128
256
1024
95
495
5.21
128
256
2048
80
496
6.20
128
256
4096
124
499
4.02
128
256
8192
96
495
5.16
128
256
16384
146
994
6.81
128
512
1024
173
499
2.88
128
512
2048
233
499
2.14
128
512
4096
172
497
2.89
128
512
8192
236
496
2.10
128
512
16384
257
1002
3.90
256
64
1024
50
992
19.84
256
64
2048
65
995
15.31
256
64
4096
76
999
13.14
256
64
8192
63
997
15.83
256
64
16384
91
1997
21.95
256
128
1024
99
993
10.03
256
128
2048
98
993
10.13
256
128
4096
127
995
7.83
256
128
8192
130
993
7.64
256
128
16384
146
1991
13.64
256
256
1024
237
994
4.19
256
256
2048
242
989
4.09
256
256
4096
237
994
4.19
256
256
8192
236
995
4.22
256
256
16384
261
2001
7.67
256
512
1024
467
1000
2.14
256
512
2048
459
991
2.16
256
512
4096
258
1005
3.90
256
512
8192
263
999
3.80
256
512
16384
378
1996
5.28
512
64
1024
93
1987
21.37
512
64
2048
96
1994
20.77
512
64
4096
83
1990
23.98
512
64
8192
99
1990
20.10
512
64
16384
118
3984
33.76
512
128
1024
240
1989
8.29
512
128
2048
178
1990
11.18
512
128
4096
247
1997
8.09
512
128
8192
192
1991
10.37
512
128
16384
261
3983
15.26
512
256
1024
350
1989
5.68
512
256
2048
334
1989
5.96
512
256
4096
262
1999
7.63
512
256
8192
342
1996
5.84
512
256
16384
355
3996
11.26
512
512
1024
933
2002
2.15
512
512
2048
526
2000
3.80
512
512
4096
644
1991
3.09
512
512
8192
642
1993
3.10
512
512
16384
933
3999
4.29

With a maximum speedup of 33-34x, I would say that is a pretty good improvement. Of course, this is just an artificial test and putting a thread to sleep for some requested amount of time isn't exactly a precise thing. Perhaps a more realistic test would provide some more interesting results. Still, this gives me hope that I am working in a good direction. The results also make me think about some other possible things to test; for example, right now every fiber that blocks will potentially spin up a new thread - what if instead there were a limited pool? Something to experiment with in the future I think.

Coding: taco continued

I've been spending a little more time working on taco lately and thought it would be good to spend some time writing out a little more about how the internals work.

As I mentioned before, the intent of taco is to create a coroutine based task processing library. On Windows these coroutines are just a basic wrapper around the system fiber support. For other platforms I am building them on top of the POSIX ucontext functionality and I employ this little bit of code with setjmp/longjmp to speed things up. With this as a basis the library is able to suspend tasks and then resume them at any point in the future, retaining their state of execution (i.e. these are not stackless coroutines).

Strictly speaking, each task is not itself a coroutine and instead there is really only one type of coroutine - the worker coroutine. The whole library is built around instances of this coroutine running on a set of threads and yielding execution to one another when appropriate. The worker routine itself is fairly simple, looking a little like:

void worker()
{
	for (;;)
	{
		if (HasScheduledTasks())
		{
			ExecuteTask();
		}
		else if (HasScheduledWorkers())
		{
			YieldTo(NextWorker()); // current fiber becomes 'inactive'
		}
		else
		{
			SleepUntilSignaled();
		}
	}
}

When you first initialize the library it will spin up a number of threads (by default it aims for the same number of threads as there are hardware threads) and each one of these threads will execute one of these workers. When you schedule a task for execution, one of these workers will pick it up to execute it; at this point things don't look all that different from any other task processing library. However, returning to the silly example from the last post:

// ...
taco::Schedule([]() -> void {
	printf("Hello ");
	taco::Switch();
	printf("World\n");
});
// ...

When this task gets executed and makes the call to Switch, it will cause the worker it is running on to be suspended and rescheduled for future execution. At this point if there are more tasks to be started then we will invoke an inactive worker (or create a new one); if there are no more tasks pending we will instead execute a previously scheduled fiber (e.g. like the one we just suspended). In code this looks a little like this:

void Switch()
{
	fiber current = FiberCurrent();
	fiber next = HasScheduledTasks() ? GetInactiveWorker() : NextWorker();
	if (next)
	{
		YieldTo(next, [=]() -> void { ScheduleFiber(current); });
	}
}

This looks a little different in the actual code but it should give you the right idea. The lambda being passed in as an argument is actually executed by the next worker at the point it is resumed - it is important to schedule the fiber we are currently executing only after we have actually switched to the next one (we do not want another thread to pick it up and try to start executing before this thread has switched from it).

Since we are executing tasks on a number of different threads it follows that some amount of synchronization between tasks might be required. However, with the ability to suspend and resume our tasks at any point, we don't rely on the normal thread blocking synchronization primitives. It becomes very simple to instead build our own non-blocking synchronization routines - if we can't acquire a lock, then we just switch to another worker until we can acquire the lock. This can look as simple as:

void LockMutex(mutex * m)
{
	uint32_t ticket = m->lock_counter.fetch_add(1);
	while (m->unlock_counter.load() != ticket)
	{
		Switch();
	}
}
void UnlockMutex(mutex * m)
{
	m->unlock_counter++;
}

So, this is maybe a little redundant with the previous post, but hopefully it explains a little more what taco is about and how it works. Right now I am prototyping a mechanism for marking regions of a task as doing "blocking" work - e.g. something like blocking file i/o that will spend a lot of time waiting instead of computing. The idea is that these regions will get migrated off their current thread to a thread dedicated to handling these regions of work, allowing the current thread to continue processing other tasks. Early testing shows some promising results - but I'll save that for a follow on post.

Off and on for a while now, I have been playing around with a little library I am calling taco - for "task coroutines." After spending some time working on a Unity project and after reading through Charles Blooms many posts on the subject (e.g. Coroutine-centric Architecture), I decided to take a crack at creating a coroutine based task processing system in C++. It has gone through several false starts and rewrites as I tried to come up with a base that I felt was reasonable; I think at this point I am ready to start talking about it.

At the core of taco is a worker coroutine, instances of which are cooperatively scheduled across a set of system threads. Rather than make each task a full blown coroutine with its own execution context, wasteful for many of the types of tasks you might normally expect to execute in a game engine, taco switches between its own internal worker coroutines which are themselves executing a task processing loop. In this way, taco only needs to create and switch to new workers when a task being executed actually requires it (e.g. it waits on some event).

To the API user, fundamentally all they see are the functions Schedule and Switch along with a handful of synchronization objects. A most basic and silly example:

// ...
taco::Schedule([]() -> void {
	printf("Hello ");
	taco::Switch();
	printf("World\n");
});
// ...

The call to Schedule will push the function object in a queue of stealable tasks and eventually the current thread (or a thief), via some worker, will pop that task out of the queue. Once a worker grabs this task it will print out "Hello " and then call Switch. When this happens the task is effectively promoted to a full blown coroutine - internally, the current worker gets scheduled for future execution and then we invoke some other idle worker to continue executing any remaining tasks. If there are no remaining tasks then a worker will check and see if there are any other scheduled workers and invoke them. This means that the worker executing our "Hello World" task eventually gets invoked by some other worker and so we are able to execute our final line printing out "World\n". With the task complete our worker can move on to executing other tasks or yielding to other workers.

That is really the core of it - but we can build on that foundation and introduce some useful constructs. For example, you might have seen the proposal for an async/await facility in some future version of C++. I have coded up a simple future object in taco, using the publicly facing API, that provides similar functionality; it simply wraps the function call in a lambda that forwards the return value and signals an event, waking any other tasks that are waiting on the return value. Some example usage code:

unsigned fibonacci(unsigned n)
{
	if (n < 2) return 1;

	auto a = taco::Start(fibonacci, n - 1);
	auto b = taco::Start(fibonacci, n - 2);

	return a + b;
}

In this example both a and b have type taco::future<unsigned> and when we hit the return statement we end up switching away from the current worker until the values in a and b are ready.

Similarly, I have put together some basic code to support generators. Using that we could write a fibonacci generator something like this:

auto fibonacci = taco::StartGenerator([]() -> unsigned {
	unsigned a = 0;
	unsigned b = 1;
	for (;;)
	{
		taco::YieldValue(b);
		unsigned tmp = a + b;
		a = b;
		b = tmp;
	}
});

In this example, fibonacci has type generator<unsigned> with methods read and completed, allowing us to keep reading values from a generator until we detect completion (if it terminates). As the code works now, a generator will yield a value and then will be suspended until that value is read (and it assumes a single reader); once the value is read, the generator is rescheduled so that we can generate the next value.

So that is more or less the high-level state of it right now. Check it out on github and feel free to point and laugh. No, really, I appreciate criticism/feedback. The code is definitely not production ready right now, but I plan to continue to hammer on it. In the short term I am working on some logging and profiling code, and then maybe I will add some OSX support (currently works on Windows and Linux, OSX should be trivial).

Coding: Four Greatest

I went ahead and let myself get a little distracted today after reading a tweet from Daniel Collin over at DICE. He posted a bit of code that uses SSE intrinsics to find the four highest valued floats in an array of floats (along with their indices). The original code looked like this:

void find_four(const float * a, size_t sz, float * fres, int * ires)
{
	__declspec(align(16)) float sinit = -FLT_MAX;
	__declspec(align(16)) int iinit[4] = {-1, -1, -1, -1};

	// Initialize all the scores to -FLT_MAX
	__m128 s = _mm_load_ps1(&sinit);

	// We just do shuffles and blends of the indices, so we store the ints as floats.
	__m128 index = _mm_load_ps((float*)iinit);

	int i = 0;
	for(const float* pa = a, *paend = a + sz; pa != paend; ++pa, ++i)
	{
		// Load the index into all 4 elements of im
		__m128 im = _mm_load_ps1((float*)&i);

		// Load a value from the array into all 4 elements in v
		__m128 v = _mm_load_ps1(pa);

		// Compare with the currently best scores
		__m128 cmp = _mm_cmpge_ps(v, s);

		// Convert to a mask which is one of 0000, 1000, 1100, 1110 or 1111
		// Switch on the mask and shuffle/blend as appropriate.
		// The same operation is done on both s and index to keep them in sync.
		switch(_mm_movemask_ps(cmp))
		{
		case 0x0:
			// dcba -> dcba
		break;
		case 0x8:
			// dcba -> Vcba
			s = _mm_blend_ps(s, v, 8);
			index = _mm_blend_ps(index, im, 8);
		break;
		case 0xc:
			// dcba -> cVba
			s = _mm_shuffle_ps(s, s, _MM_SHUFFLE(2, 2, 1, 0));
			s = _mm_blend_ps(s, v, 4);
			index = _mm_shuffle_ps(index, index, _MM_SHUFFLE(2, 2, 1, 0));
			index = _mm_blend_ps(index, im, 4);
		break;
		case 0xe:
			// dcba -> cbVa
			s = _mm_shuffle_ps(s, s, _MM_SHUFFLE(2, 1, 1, 0));
			s = _mm_blend_ps(s, v, 2);
			index = _mm_shuffle_ps(index, index, _MM_SHUFFLE(2, 1, 1, 0));
			index = _mm_blend_ps(index, im, 2);
		break;
		case 0xf:
			// dcba -> cbaV
			s = _mm_shuffle_ps(s, s, _MM_SHUFFLE(2, 1, 0, 0));
			s = _mm_blend_ps(s, v, 1);
			index = _mm_shuffle_ps(index, index, _MM_SHUFFLE(2, 1, 0, 0));
			index = _mm_blend_ps(index, im, 1);
		break;
		default:
			assert(0);
		break;
		}
	}

	_mm_store_ps(fres, s);
	_mm_store_ps((float*)ires, index);
}

You can write up a more straightforward plain scalar version of this code:

void find_four_scalar(const float * a, size_t sz, float * fres, int * ires)
{
	fres[0] = fres[1] = fres[2] = fres[3] = -FLT_MAX;
	ires[0] = ires[1] = ires[2] = ires[3] = -1;
	int i = 0;
	for(const float* pa = a, *paend = a + sz; pa != paend; pa++, i++)
	{
		float v = *pa;
		if (v >= fres[0])
		{
			fres[3] = fres[2];
			fres[2] = fres[1];
			fres[1] = fres[0];
			fres[0] = v;
			ires[3] = ires[2];
			ires[2] = ires[1];
			ires[1] = ires[0];
			ires[0] = i;
		}
		else if (v >= fres[1])
		{
			fres[3] = fres[2];
			fres[2] = fres[1];
			fres[1] = v;
			ires[3] = ires[2];
			ires[2] = ires[1];
			ires[1] = i;
		}
		else if (v >= fres[2])
		{
			fres[3] = fres[2];
			fres[2] = v;
			ires[3] = ires[2];
			ires[2] = i;
		}
		else if (v >= fres[3])
		{
			fres[3] = v;
			ires[3] = i;
		}
	}
}

Given an array of 2^25 random floats, and testing on my i5-3317U I get the following times:

find_four: 54 ms
find_four_scalar: 79 ms
We can provoke the worst case behaviour for the SSE implementation by making the array of values be monotonically increasing - giving us:
find_four: 148 ms
find_four_scalar: 68 ms
And best case behaviour by making sure the four highest values are at the very start:
find_four: 54 ms
find_four_scalar: 79 ms

So the question is - can we do better? As it turns out we can make a simple adjustment that improves performance quite a bit for the random and best case and has only a small impact on the worse case. The SSE version still works on one float at a time, but we can adjust it to potentially reject groups of floats at a time (in this case I will try 8). Specifically, I load up 8 floats and using some shuffles and _mm_max_ps I get the maximum value of those 8 floats; if the maximum is less than our current 4 best then we can just skip to the next 8. Simple. The code:

inline void cmp_one_to_four(int i, __m128 v, __m128 & s, __m128 & index)
{
	// Load the index into all 4 elements of im
	__m128 im = _mm_load_ps1((float*)&i);
 
	// Compare with the currently best scores
	__m128 cmp = _mm_cmpge_ps(v, s);
 
	// Convert to a mask which is one of 0000, 1000, 1100, 1110 or 1111
	// Switch on the mask and shuffle/blend as appropriate.
	// The same operation is done on both s and index to keep them in sync.
	switch(_mm_movemask_ps(cmp))
	{
	case 0x0:
		// dcba -> dcba
	break;
	case 0x8:
		// dcba -> Vcba
		s = _mm_blend_ps(s, v, 8);
		index = _mm_blend_ps(index, im, 8);
	break;
	case 0xc:
		// dcba -> cVba
		s = _mm_shuffle_ps(s, s, _MM_SHUFFLE(2, 2, 1, 0));
		s = _mm_blend_ps(s, v, 4);
		index = _mm_shuffle_ps(index, index, _MM_SHUFFLE(2, 2, 1, 0));
		index = _mm_blend_ps(index, im, 4);
	break;
	case 0xe:
		// dcba -> cbVa
		s = _mm_shuffle_ps(s, s, _MM_SHUFFLE(2, 1, 1, 0));
		s = _mm_blend_ps(s, v, 2);
		index = _mm_shuffle_ps(index, index, _MM_SHUFFLE(2, 1, 1, 0));
		index = _mm_blend_ps(index, im, 2);
	break;
	case 0xf:
		// dcba -> cbaV
		s = _mm_shuffle_ps(s, s, _MM_SHUFFLE(2, 1, 0, 0));
		s = _mm_blend_ps(s, v, 1);
		index = _mm_shuffle_ps(index, index, _MM_SHUFFLE(2, 1, 0, 0));
		index = _mm_blend_ps(index, im, 1);
	break;
	default:
		assert(0);
	break;
	}
}
		
void find_four_mod(const float * a, size_t sz, float * fres, int * ires)
{
	__declspec(align(16)) float sinit = -FLT_MAX;
	__declspec(align(16)) int iinit[4] = {-1, -1, -1, -1};
 
	__m128 s = _mm_load_ps1(&sinit);
	__m128 index = _mm_load_ps((float*)iinit);
 
	int i = 0;
	for(const float* pa = a, *paend = a + sz; pa != paend; pa += 8, i += 8)
	{
		__m128 m = _mm_max_ps(_mm_load_ps(pa), _mm_load_ps(pa + 4));
		m = _mm_max_ps(_mm_max_ps(_mm_shuffle_ps(m,m,_MM_SHUFFLE(0,0,0,0)), _mm_shuffle_ps(m,m,_MM_SHUFFLE(1,1,1,1))),
				_mm_max_ps(_mm_shuffle_ps(m,m,_MM_SHUFFLE(2,2,2,2)), _mm_shuffle_ps(m,m,_MM_SHUFFLE(3,3,3,3))));
		
		if (_mm_movemask_ps(_mm_cmpge_ps(m, s)) == 0)
			continue; 

		__m128 a = _mm_load1_ps(pa);
		__m128 b = _mm_load1_ps(pa + 1);
		__m128 c = _mm_load1_ps(pa + 2);
		__m128 d = _mm_load1_ps(pa + 3);
		m = _mm_max_ps(_mm_max_ps(a,b), _mm_max_ps(c,d));
		if (_mm_movemask_ps(_mm_cmpge_ps(m, s)) != 0)
		{
			cmp_one_to_four(i, a, s, index);
			cmp_one_to_four(i + 1, b, s, index);
			cmp_one_to_four(i + 2, c, s, index);
			cmp_one_to_four(i + 3, d, s, index);	
		}

		a = _mm_load1_ps(pa + 4);
		b = _mm_load1_ps(pa + 5);
		c = _mm_load1_ps(pa + 6);
		d = _mm_load1_ps(pa + 7);
		m = _mm_max_ps(_mm_max_ps(a,b), _mm_max_ps(c,d));
		if (_mm_movemask_ps(_mm_cmpge_ps(m, s)) != 0)
		{
			cmp_one_to_four(i + 4, a, s, index);
			cmp_one_to_four(i + 5, b, s, index);
			cmp_one_to_four(i + 6, c, s, index);
			cmp_one_to_four(i + 7, d, s, index);	
		}
	}

	_mm_store_ps(fres, s);
	_mm_store_ps((float*)ires, index);
}

How does this clock in? Running the same test with this code yields:

Random-Case: 12 ms
Worst-Case: 159 ms
Best-Case: 12 ms
So over a 4x improvement for the best/random cases, and only slightly slower in the worst case scenario.

Anyway, that was a fun little distraction from the networking code I was otherwise working on...

Coding: V8 Garbage Collection

I went ahead and extracted the V8 wrapping code from my hobby engine and have put it up on github with some example code: v8wrap

There are other fine V8 wrappers out there and V8 is pretty easy to work with directly anyway, but maybe someone will find it useful and I'd love feedback/criticism from anyone that wants to offer it. One thing I would like to point out though about the code is the InstallGC/ForceGC functions. I've been reading a number of different blog entries from around the net about working with V8 and pretty much all of them will tell you that in order to get V8 to perform garbage collection you need to do something like this:

v8::V8::AdjustAmountOfExternalAllocatedMemory(SOME_LARGE_VALUE);

// or

while(!v8::V8::IdleNotification()) {};

While it is true that both of these methods can invoke the garbage collector, there is a more direct and thorough approach. If you make a call to v8::V8::SetFlagsFromString with the argument "--expose-gc" then a new global "gc" function will be exposed to your scripts, allowing you to very directly invoke the garbage collector.