20170108

Advent of Code Day 14 Parallelization

Ahhh, the holidays are a wonderful time of the year. What? No, it has nothing to do with Christmas, or snow, or vacation. OK, maybe that last one. But especially because of the annual return of Advent of Code, an advent calendar of small programming puzzles that releases gradually throughout the month of December. The puzzles are self-contained, but some are still pretty tricky. It's a great way to expand your horizons or learn a new language.

Each puzzle has two parts. Solve the first part, and the second part unlocks. It's usually a twist on the first part, requiring anywhere from just a constant change to a massive rewrite. A recurring bugaboo in second parts is a tweak that ramps up the computation time needed by a large amount. If your first part wasn't implemented in a performant way, your second part might consume a ton of memory or take ages to run.

In case you're curious, the full code for all my solutions so far is up on a github repository.

Day 14 this year did exactly that. The first part is not too hard: generate MD5 hashes, keep track of "candidate" ones, then either confirm or age them out within the next 1000 hashes you generate. The second part added something called "hash stretching". Instead of generating a single hash, hash the output of the hash over and over, 2016 times. The program will end up taking a lot longer to run, because each hash takes two thousand times as long to generate.

I can already tell that's going to be annoyingly long, so I'll want to try to speed things up. In this blog post I will take you down the rabbit hole to see different ways make it go faster.

Bad serial performance

My original implementation took about 3 seconds to run part 1. Part 2 took 458 seconds. Holy cow, that's a lot! I know I can do better. Part of my mind spotted obvious slow computations to parallelize for a speedup, but another part of my mind reined it in and said, "hey, bud, isn't that premature optimization?" Yeah, that's right. I should probably profile my code and see what's actually taking so long.

I've been doing all my code in F# this year and been loving it. I now feel pretty comfortable in it for small programs. Anyway, the Visual Studio profiler worked seamlessly and easily to profile a few seconds' worth of execution and show me what's going so slow. Let's take a look.

What the fresh garbage is going on here?? Sure, 20% of the execution time is taken in computing MD5 hashes, but I was expecting that to be the slowest part, not the 50% spent in converting to a hex string. If the bottleneck is in my own code, well, I should just take care of that. Let's see what this ornery code looks like.

let toByteArray str =
    Seq.toArray (Seq.map (fun (elem : char) -> Convert.ToByte(elem)) str)
;;

let toHexString byteArray =
    Array.fold (fun state (elem : byte) -> state + (sprintf "%02x" elem)) "" byteArray
;;

let rec generateStretchedHash stretchSize (bytes:byte[]) =
    let md5Bytes = g_md5.ComputeHash(bytes) in
    if stretchSize = 0 then
        md5Bytes
    else
        generateStretchedHash (stretchSize - 1) (toByteArray (toHexString md5Bytes))
;;

The MD5 function takes a byte array as an input and produces a byte array as output. The problem says to hash the lowercase hexadecimal string representation of the previous hash, so I do have to convert from byte array output to a hex string, then to a byte array for input. But apparently, as profiling shows, sprintf might be the worst way to do it. Let's speed this up, shall we?

The conversion from 0x12 to "12" to [| 0x31; 0x32 |] (that's the ASCII representation) should be really straightforward. I should be able to skip straight from the input byte to the output pair of ASCII bytes.

let toByteArray str =
    Seq.toArray (Seq.map (fun (elem : char) -> Convert.ToByte(elem)) str)
;;

let BYTE_TO_STRING = [| for i in 0 .. 0xff -> sprintf "%02x" i |];;
let BYTE_TO_BYTE_STRING = Array.map toByteArray BYTE_TO_STRING;;

let byteArrayToStringByteArray bytes =
    let output = Array.create ((Array.length bytes) * 2) 0uy in
    Array.iteri (fun i b ->
        let nibbles = BYTE_TO_BYTE_STRING.[int b] in
        let _ = output.[i * 2] <- nibbles.[0] in
        output.[(i * 2) + 1] <- nibbles.[1]
    ) bytes;
    output
;;

let rec generateStretchedHash stretchSize (bytes:byte[]) =
    let md5Bytes = g_md5.ComputeHash(bytes) in
    if stretchSize = 0 then
        md5Bytes
    else
        generateStretchedHash (stretchSize - 1) (byteArrayToStringByteArray md5Bytes)
;;

So now I pre-calculate all 256 mappings of byte-to-two-bytes that represent the ASCII hex chars for that byte. Let's look at the profile now, shall we?

That's much better. The slowest path is the system library for MD5 generation (i.e. Not My Code), so now I'm in a good place to apply some parallelization. Maybe I should have left my code slow, though... as my parallel programming professor said, "bad serial performance makes it easy to get a good parallel speedup."

The right way to slice and dice it

I don't have a ton of experience with parallel programming, but to me, the fun part is how to partition the problem. The top considerations are how to amortize the cost of farming out data to the parallel workers and how to avoid lock contention when the workers need to access shared data. I like this problem because it gives some options for how to do this.

To review the problem again, I have to generate 64 "keys". Each key is found by examining a hash of some salt and an increasing "index" number. If the hash has some certain property (3+ consecutive nibbles with the same value) then it's a candidate, and we need to examine the hashes of the next 1000 indices for one with 5 consecutive nibbles with the same value to confirm it as a "key". Here are two ways I thought about partitioning the problem.

Create and confirm batches of candidates

I could spin up 2 workers in parallel (could be extended to n workers) who each generate candidates found in a batch of indices. Each worker would generate, say, 2000 hashes serially, find candidates in those hashes, and confirm any that it can. At the end of the 2000 hashes, it would have to keep generating more hashes to age out or confirm any candidates it still had left, but not generate more candidates, because those would be owned by another worker.

This approach does get us parallelism by having workers generating hashes in parallel (the slow path from the serial profile above), but workers would be doing duplicate work. Say index 1500 (owned by worker 1) is a candidate that's confirmed by index 2100. Both worker 1 and worker 2 (who is responsible for indices 2000 - 3999) would generate hashes 2000 - 2100, because worker 1 has to confirm its lingering candidates and worker 2 has to find new candidates in that range.

The duplication of work could be mitigated by having the first one to generate the hash for a given index to store it in some shared memory, but now you have to consider how much space you want to dedicate to storing those. Maybe a ring buffer so they cycle out? Does that mean a worker has to scan the storage to see if a given index is present? How much of the storage does it need to scan? Or is there some other data structure to make that faster to look up?

Too many problems. I don't want to deal with all that. Let's do something easier.

Generate hashes as fast as possible

If the slow part is the hash generation, why don't we try to do only that as fast as possible? I can have n workers whose sole job is to produce hashes. The main thread will consume the already-created hashes and do the candidate creation and confirmation.

The thing I like about this is there are fewer points of contention. Each worker needs to know which index to generate a hash for next and where to store it. I made each worker have its own ring buffer in a global array. The main thread has to scan the n workers' ring buffers to find the next index it's looking for, but that's a very small scan, because the next index is guaranteed to be at the front of one of the ring buffers.

Implementing the ring buffer

I have a love/hate relationship with ring buffers. We use them all the time at work, but they're still a bit annoying to remember the logistics behind. The one I wrote here is pretty simple. I'll dump the code here and point out a few things about it.

type RingBuffer<'a>(size, initializer) =
    let storage = Array.create size initializer
    let mutable readIndex = 0
    let mutable writeIndex = 0

    member this.append value =
        let work () =
            let nextWriteIndex = (writeIndex + 1) % (Array.length storage) in
            if nextWriteIndex = readIndex then
                false
            else
                let _ = storage.[writeIndex] <- value in
                let _ = writeIndex <- nextWriteIndex in
                true
        in
        lock storage work

    member this.peek () =
        if readIndex = writeIndex then
            None
        else
            Some storage.[readIndex]

    member this.consumeFirst () =
        let work () =
            let itemOpt = this.peek () in
            let _ =
                if itemOpt.IsSome then
                    readIndex <- (readIndex + 1) % (Array.length storage)
                else
                    ()
            in
            itemOpt
        in
        lock storage work
;;

Just three methods. Not too bad. Like most ring buffers, it's a FIFO queue. You can only write to the end of it and can only read from the front of it, one item at a time.

The lock operator (not function, not keyword) in F# takes any object and uses it as a mutex, so I just use the storage array itself as the lock object. The rudiments of concurrent programming are that in general, any two accesses to variables that depend on each other should be done under a lock so that you know nobody else modified them in between your two accesses. Let's take a quick look at each function under this lens.

append accesses both readIndex and writeIndex in multiple places. If either of those got modified midway through, the logic would be disturbed, so I take the lock for the whole function. The function returns a bool indicating if it was actually able to append the value. If false, it means the buffer was full, and it'll need to retry after someone calls consumeFirst.

consumeFirst is roughly the same. It accesses readIndex and writeIndex, both directly and indirectly through peek, so needs to be locked.

peek is a funny one. It sure doesn't look safe, the way I access readIndex and writeIndex, and you're right, except in this particular program I can reason about it and know that it's okay. Hear me out: I know readIndex is safe because the only thread that modifies it is the same thread that's calling peek. writeIndex could be modified while a peek call is in progress, but it can only add elements to the list, not remove them, so peek is never in danger of returning an invalid element.

Producers and consumers

The meat of the move to parallel hash generators is producing the hashes and consuming the hashes. Before we look at that, let's look at the difference in the main part of the program when I parallelize the hash generation. This is roughly what it looked like before.

let generateKeys numKeysToGenerate =
    let rec helper keysSoFar candidateKeys index =
        let md5Bytes = generateStretchedHash index in
        let (newKeysSoFar, newCandidateKeys) = processCandidates md5Bytes in
        if (List.length keysSoFar) > numKeysToGenerate && (List.isEmpty candidateKeys) then
            keysSoFar
        else
            helper newKeysSoFar newCandidateKeys (index + 1UL)
    in
    helper [] [] 0UL
;;

That is, generate one hash at a time, then make new candidates and confirm candidates until you have enough keys and there are no more candidates. I've omitted some details that don't matter for the parallelization. The beautiful part is there's only one line change in the parallelized version:

...
let md5Bytes = consumeNextHash index in
...

Let's take a look at what consumeNextHash does, shall we?

Consumers

I know I said "producers and consumers" before, but I can't pass up a good segue, so I'm flipping the ordering to "consumers and producers".

let consumeNextHash index =
    let rec helper () =
        let tryConsumeHashFromRingBuffer (hashOpt:byte[] option) (ringBuffer:RingBuffer<uint64 * byte[]>) =
            if hashOpt.IsNone then
                match ringBuffer.peek () with
                | Some (indexInRingBuffer, hashInRingBuffer) ->
                    if indexInRingBuffer = index then
                        let consumed = ringBuffer.consumeFirst () in
                        let _ = assert(consumed.IsSome) in
                        Some hashInRingBuffer
                    else
                        hashOpt
                | None -> hashOpt
            else
                hashOpt
        in
        match Array.fold tryConsumeHashFromRingBuffer None g_workerRingBuffers with
        | None -> helper ()
        | Some hsh -> hsh
    in
    helper 1
;;

There looks like a lot going on here, but it's not too bad. Recall the global array of ring buffers, one for each worker that's producing hashes, g_workerRingBuffers. The Array.fold call here searches them, trying to find the next desired index at the front of each. That's what tryConsumeHashFromRingBuffer does.

It has to peek each ring buffer and only consume the first if it found the right index. The interesting gotcha here is that if the hashes aren't being produced fast enough, the ring buffers might be empty or might not yet have the next needed hash. What should I do in that case? Well, what I pasted above just loops back around and tries again. It spins and waits until it gets the hash it needs. That's not unlike what the serial version was doing. Some of you are cringing at this already. As we'll see later, there's a better way.

Producers

Let's see the side that produces the hashes. Remember, this one will be run n times in parallel. It has to keep picking an index to generate the hash for, then store it in its ring buffer for the consumer to pick up.

let g_nextIndex = ref 0L;;
let createHasher workerNum =
    async {
        let getNextIndex () =
            (Microsoft.FSharp.Core.Operators.uint64 (System.Threading.Interlocked.Increment(g_nextIndex))) - 1UL
        in
        let rec generateAndStoreNextHash index =
            let hsh = generateStretchedHash index in
            let rec storeHash () =
                if (g_hashStorage.[workerNum].append (index, hsh)) then
                    ()
                else
                    storeHash ()
            in
            let _ = storeHash 1 in
            generateAndStoreNextHash (getNextIndex ())
        in
        generateAndStoreNextHash (getNextIndex ())
    }
;;

There's a bit of fun stuff here. Let's start with getNextIndex. I don't want two workers doing a hash of the same index, so there needs to be some kind of locking around g_nextIndex. A substitute for locking, especially on a single piece of data with a simple access model--in this case "get the latest value and increment"--is an interlocked operation. Interlocked.Increment atomically reads and increments the value, so no additional lock is needed.

A little bugaboo: since Interlocked.Increment takes a signed int64, and I'm using an unsigned uint64, I need to cast it. But since I am compiling with --checked+ (generate underflow/overflow checks), I can't use the usual cast from int64 to uint64. Luckily, all the F# operators are callable like functions, and the "unchecked" casting operators are in the Microsoft.FSharp.Core.Operators namespace, so I can bypass my overflow checks in just this spot by calling the fully qualified operator name. Whew!

generateStretchedHash is the same as earlier in the blog post. storeHash has a similar problem to consumeNextHash in that if the ring buffer is full--i.e. the main thread is not pulling hashes out fast enough--it needs to loop and keep trying to store the hash until there is space. You'll notice this is another spin loop.

So, is it faster?

I don't feel like waiting for the whole 64 keys to be generated, so I'll do perf testing with an abbreviated run, just 16 keys instead. The serial version takes about 21.8 sec on my laptop to run. The parallel version with 5 workers takes... 22 seconds!? It was supposed to go faster! What have I done wrong? How do I even tell? By now you should know the answer: back to the profiler!

Let's see what the profile looks like. Well, there are two threads going on, so let's see both of them. The first one is what we'd expect: it's our worker threads doing hash generation.

So that accounts for 70% of the CPU time. Where's the other 30% gone? Well, there's our main thread.

Since we're sitting in a tight loop checking the ring buffers for the next index, it consumes a lot of CPU time, and that shows up in the profile. What's worse, this thread competes with other threads for CPU time, resulting in context switches that slow everything down. I alluded to it before, but there has to be a better way.

Getting rid of spin waits

The main thread only has work to do if the next index it needs has been produced by one of the worker threads. Since the main thread is consuming hashes much faster than the worker threads are producing them, there will inevitably be some time to wait. That time could be given to the one of the worker threads, if the main thread goes to sleep. So let's put it to sleep!

We're going to use something called an Event. In this case, an AutoResetEvent. This is just a wrapper around a win32 event. One thread can wait on the event until it is "signaled", and another thread can signal it when it wants. The waiting thread is put into a suspended state until the wait completes, which doesn't take any CPU resources. Auto-reset just means that whenever a wait completes, which normally leaves the event in the signaled state, it immediately resets it to an unsignaled state, so you can't wait on the event twice in a row; the second wait will delay until it becomes signaled again.

match Array.fold tryConsumeHashFromRingBuffer None g_hashStorage with
| None ->
    let _ = g_hashReadyEvent.WaitOne() in
    helper ()
| Some hsh -> hsh

That's just one half. Now the worker thread needs to signal the event when it produces a new hash, in case it's the one the main thread is waiting on. Think back to storeHash, above.

let rec storeHash () =
    if (g_hashStorage.[workerNum].append (index, hsh)) then
        let _ = g_hashReadyEvent.Set() in
        ()
    else
        storeHash ()

With this tiny change, let's see how fast it goes. With spin waits, it took 22 seconds. With the event, it takes 16.5 seconds. That's a nice speedup! The processor did a lot better with scheduling more work when the main thread is asleep.

A little QPC timing

With the change to use an event rather than spin-waiting, the main thread disappears completely from the profile. I'd like to still be able to see how much time the main thread spends doing things like waiting for its next hash. I'll just have to add the timing myself. One decent way to do this is with QueryPerformanceCounter. But that's a win32 function, whereas F# is a .NET/managed language, so I'll have to make a wrapper. Luckily, the syntax to import a native function into F# code is pretty easy.

module QueryPerformance =
    module Wrapper =
        [<DllImport("KERNEL32.dll")>]
        extern bool QueryPerformanceCounter(uint64& wpPerformanceCount)

        [<DllImport("KERNEL32.dll")>]
        extern bool QueryPerformanceFrequency(uint64& lpFrequency)

    let mutable frequency =
        let mutable result = 0UL in
        if Wrapper.QueryPerformanceFrequency(&result) then
            result
        else
            0UL

    let QueryPerformanceCounter () =
        let mutable result = 0UL in
        if Wrapper.QueryPerformanceCounter(&result) then
            result
        else
            0UL

    let millisecondsFromPerfCounts counts =
        1000UL * counts / frequency
;;

With this little utility, I added three counters: around the Interlocked.Increment call, around RingBuffer.append, and around RingBuffer.consumeFirst. These are basically the three locking points between the threads, and I want to know if threads are taking a long time at any of these points because they're contending with other threads.

Wait timings

I didn't use a profile to gather the measurements; instead, I ran about 800 trials and gathered the resulting data in a spreadsheet. For each configuration, I ran the test 5 times and present the average runtime in the spreadsheet.

Pivot Tables in Excel are awesome, perfect for looking at this kind of data, where there are multiple varying inputs, and you have data for all the combinations. First, let's look at those wait timings I just added measurements for.

This chart shows how much total time is spent in Interlocked.Increment, append, and consumeFirst, with anywhere from 1-20 workers; ring buffer sizes of 2 and 10; and whether or not the code is using spin waits or event waits.

First you'll notice that the Interlocked.Increment times are all basically 0. That makes sense, because it's usually implemented as a single processor instruction, so the time would not vary much, no matter how many threads are accessing the same memory.

Time spent waiting on consumeFirst is also basically 0. That must be because the consuming thread first uses peek to make sure there's a valid element to consume before calling consumeFirst, and it's only contending with one other thread.

The times for append are interesting. At a ring buffer size of 2 (which, in my implementation, means you can only store one element in the ring buffer), measurable time is spent in append, because the ring buffer has filled up and doesn't get drained fast enough to add more elements. At 10 and larger ring buffer sizes, this problem goes away. I probably should have measured 3, too.

Overall timings

Now let's look at whether the size of the ring buffer affects the timings. The two colored bars on the following charts are total time spent in the program and how much of that time was spent waiting for hashes. I wanted to make sure that waiting for hashes was the majority of the execution time in all cases, and you'll see that it is, no matter how many workers (1 - 20), how big the ring buffer (2, 10, 50, 100, 1000), or whether it's using spin waits or event waits. First let's look at how the ring buffer size affects timings.

Fixing at 5 workers, you can see that, aside from the 1-element ring buffer, the time is basically unchanged for any other size. This makes sense, because the hash generation can't keep up with the speed of consuming hashes, even in parallel. Now let's fix the ring buffer size and see how number of workers affects the timings.

This chart shows event waits on the left side and spin waits on the right side. The best speeds are at about 5 workers or more. I don't know how to explain the spike at 9-10 workers. I looked at the profile of it, and it looked totally tame. Weird.

So why doesn't the time get much better after 5 workers? I broke into the debugger during an execution and saw something you can't see in the profiles: even with 20 asynchronous workers spun up, there are only 4 threadpool threads active at once, because my computer only has 4 virtual processors (2 cores, 2 hyperthreads per core), so any more than 4 tasks at a time probably has a slight perf degradation. The numbers don't quite show that, so it must be a negligible degradation for this program. It's an interesting revelation.

The rabbit hole, it just keeps going deeper and deeper. It never ends. At some point we have to stop. That point is now. Hopefully there was something interesting in here for you.

0 comments:

Post a Comment