summaryrefslogtreecommitdiff
path: root/exercises/093_async9.zig
blob: f3c26c95935f8b7024cdfa931410c55c908d7142 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
//
// We've been using io.async() to launch tasks. But there's a
// stronger variant: io.concurrent().
//
// The difference:
//
//   io.async():
//     * The function MAY run on a separate unit of concurrency,
//       or it may run immediately on the caller (synchronously).
//     * Never fails — if no concurrency is available, it just
//       runs the function right away.
//     * More portable, works with all Io backends.
//
//   io.concurrent():
//     * GUARANTEES a separate unit of concurrency.
//     * Can fail with error.ConcurrencyUnavailable if resources
//       are exhausted or the backend doesn't support it.
//     * Use when you NEED the task to run independently of the
//       caller.
//
// What is a "unit of concurrency"? That depends on the backend!
// The Threaded backend uses OS threads. But the Evented backends
// (Uring, Kqueue, Dispatch) use M:N green threads / fibers,
// which can provide concurrency even on a SINGLE OS thread.
// Your code doesn't need to know the difference.
//
// Because concurrent() can fail, you must handle the error:
//
//     var future = try io.concurrent(myFn, .{args});
//     defer _ = future.cancel(io);
//     const result = future.await(io);
//
// Let's try a slightly simplified example from signal processing:
// Suppose we're looking for the beginning of a signal above the noise
// level. To do this, we compare each entry from beginning to end with
// the threshold. To speed things up a bit, we split the signal into
// two halves and have two parallel workers search for them.
// Who finds the beginning first "wins" and thus ends the other one.
//
// As I said, this is a simplified explanation,
// but in practice it's done more or less like this.
//
const std = @import("std");
const Io = std.Io;
const print = std.debug.print;

const SearchResult = struct {
    found: bool,
    worker_id: u8 = 0,
    index: usize = 0,
};

pub fn main(init: std.process.Init) !void {
    const io = init.io;

    const data = [_]u32{ 10, 23, 45, 67, 12, 69, 3, 54, 69, 42, 68, 56, 71, 79, 79, 75, 70, 77 };
    const threshold = 70;
    const mid = data.len / 2;

    // A queue with space for one result.
    var buf: [1]SearchResult = undefined;
    var queue = Io.Queue(SearchResult).init(&buf);

    // Launch two workers, each searching half the array.
    // Remember, we want them to be guaranteed separate units of concurrency.
    var f1 = ???(searchRange, .{ data[0..mid], target, 0, 0, &queue, io });
    defer _ = f1.cancel(io);

    var f2 = ???(searchRange, .{ data[mid..], target, mid, 1, &queue, io });
    defer _ = f2.cancel(io);

    // Wait for the first result.
    const result = try queue.getOne(io);

    if (result.found)
        print("Worker {} found signal start over threshold at index {}!\n", .{ result.worker_id, result.index });
}

fn searchThreshold(
    io: Io,
    slice: []const u32,
    threshold: u32,
    base_offset: usize,
    worker_id: u8,
    queue: *Io.Queue(SearchResult),
) void {
    for (slice, 0..) |val, i| {
        // This pause is necessary so that the process can be canceled
        // if another one has already finished. Without this pause,
        // all workers would continue until the end.
        io.sleep(Io.Duration.fromMilliseconds(1), .awake) catch return;

        // To test this, you can uncomment this to view the work of the workers
        // and then comment out the pause.
        // print("id: {} - val: {}\n", .{ worker_id, val });

        if (val >= threshold) {
            queue.putOne(io, .{
                .found = true,
                .worker_id = worker_id,
                .index = base_offset + i,
            }) catch return;
            return;
        }
    }

    // Nothing found
    queue.putOneUncancelable(io, .{ .found = false }) catch return;
}