Pivot Partitioning Complete Source Code
This is the complete source code for the pivot partitioning algorithm used as an example for the scatter/gather transfers feature.
The DPU code performs a partitioning around a pivot:
#include <mram.h>
#define DPU_BUFFER_SIZE (1 << 6)
__host int pivot;
__host uint64_t metadata[2];
__mram_noinit uint32_t buffer[DPU_BUFFER_SIZE];
int main(void) {
/* load the data from MRAM */
uint32_t work_buffer[DPU_BUFFER_SIZE];
mram_read(&buffer, &work_buffer, DPU_BUFFER_SIZE * sizeof(uint32_t));
/* perform the pivot */
uint32_t i = 0;
uint32_t j = DPU_BUFFER_SIZE - 1;
while (i < j) {
while (i < DPU_BUFFER_SIZE - 1 && work_buffer[i] <= pivot) {
i++;
}
while (j > 0 && work_buffer[j] > pivot) {
j--;
}
if (i < j) {
uint32_t tmp = work_buffer[i];
work_buffer[i] = work_buffer[j];
work_buffer[j] = tmp;
}
}
/* store the metadata */
metadata[0] = i; /* length of left partition */
metadata[1] = DPU_BUFFER_SIZE - i; /* length of right partition */
/* store the data back to MRAM */
mram_write(&work_buffer, &buffer, DPU_BUFFER_SIZE * sizeof(uint32_t));
return 0;
}
The code is built to be executed by a single tasklet:
dpu-upmem-dpurte-clang -O3 pivot_example.c -o pivot_example
The host code that uses scatter/gather transfers to aggregate the results of the DPU tasks is the following:
/* Pivot using multiple DPUs */
/* Showcases the use of scatter/gather transfers when the output arrays don't
* align. */
/* Distributes a collection of integers in MRAM, */
/* requests the DPUs to perform a pivot, */
/* then aggregates the results. */
#include <assert.h>
#include <dpu.h>
#ifndef DPU_BINARY
#define DPU_BINARY "pivot_example"
#endif
/* Size of the buffer that we want to pivot: 1KByte. */
#define BUFFER_SIZE (1 << 8)
/* Number of DPUs to use. */
#define NB_DPUS 4
/* Range of the random numbers. */
#define MAX_RANDOM 1000
/* Number of blocks */
#define NB_BLOCKS 2 // we're going to partition the buffer in 2 blocks
/* Fill a buffer with random numbers between 1 and MAX_RANDOM. */
void fill_random_buffer(int *buffer, size_t size) {
srand(42);
for (size_t i = 0; i < size; i++) {
buffer[i] = rand() % MAX_RANDOM + 1;
}
}
void *bidimensional_malloc(size_t x, size_t y, size_t type_size) {
void **array = malloc(x * sizeof(void *));
for (size_t i = 0; i < x; i++) {
array[i] = malloc(y * type_size);
}
return array;
}
void bidimensional_free(void *array, size_t x) {
for (size_t i = 0; i < x; i++) {
free(((void **)array)[i]);
}
free(array);
}
void populate_mram(struct dpu_set_t set, int *buffer) {
struct dpu_set_t dpu;
uint32_t each_dpu;
/* Distribute the buffer across the DPUs. */
DPU_FOREACH(set, dpu, each_dpu) {
DPU_ASSERT(
dpu_prepare_xfer(dpu, &buffer[each_dpu * BUFFER_SIZE / NB_DPUS]));
}
DPU_ASSERT(dpu_push_xfer(set, DPU_XFER_TO_DPU, "buffer", 0,
BUFFER_SIZE / NB_DPUS * sizeof(int),
DPU_XFER_DEFAULT));
}
size_t **get_metadata(struct dpu_set_t set, uint32_t nr_dpus) {
struct dpu_set_t dpu;
uint32_t each_dpu;
size_t **metadata = bidimensional_malloc(nr_dpus, 2, sizeof(size_t));
DPU_FOREACH(set, dpu, each_dpu) {
DPU_ASSERT(dpu_prepare_xfer(dpu, metadata[each_dpu]));
}
DPU_ASSERT(dpu_push_xfer(set, DPU_XFER_FROM_DPU, "metadata", 0,
sizeof(**metadata) * 2, DPU_XFER_DEFAULT));
return metadata;
}
size_t get_left_length(size_t **metadata) {
size_t lower_length = 0;
for (int i = 0; i < NB_DPUS; i++) {
lower_length += metadata[i][0];
}
return lower_length;
}
/* Compute the addresses of inbound blocks in the output buffer. */
void compute_block_addresses(
size_t **metadata, /* [in] array of block lengths */
uint8_t ***block_addresses, /* [out] indexes to store the blocks */
int *out_buffer, /* [in] output buffer */
size_t lower_length /* [in] length of the lower partition */
) {
block_addresses[0][0] = (uint8_t *)out_buffer;
block_addresses[0][1] = (uint8_t *)&out_buffer[lower_length];
for (int i = 1; i < NB_DPUS; i++) {
for (int j = 0; j < NB_BLOCKS; j++) {
size_t previous_length = metadata[i - 1][j] * sizeof(*out_buffer);
block_addresses[i][j] = block_addresses[i - 1][j] + previous_length;
}
}
}
/* User structure that stores the get_block function arguments */
typedef struct sg_xfer_context {
size_t **metadata; /* [in] array of block lengths */
uint8_t ***block_addresses; /* [in] indexes to store the next block */
} sg_xfer_context;
/* Callback function that returns the block information for a given DPU and
* block index. */
bool get_block(struct sg_block_info *out, uint32_t dpu_index,
uint32_t block_index, void *args) {
if (block_index >= NB_BLOCKS) {
return false;
}
/* Unpack the arguments */
sg_xfer_context *sc_args = (sg_xfer_context *)args;
size_t **metadata = sc_args->metadata;
size_t length = metadata[dpu_index][block_index];
uint8_t ***block_addresses = sc_args->block_addresses;
/* Set the output block */
out->length = length * sizeof(int);
out->addr = block_addresses[dpu_index][block_index];
return true;
}
/* Validate the partition. */
void validate_partition(int pivot, const int *buffer, size_t lower_length) {
for (size_t i = 0; i < lower_length; i++) {
assert(buffer[i] <= pivot);
}
for (size_t i = lower_length; i < BUFFER_SIZE; i++) {
assert(buffer[i] > pivot);
}
printf("Succeeded with lower_length = %zu\n", lower_length);
}
int main() {
struct dpu_set_t set;
/* Generate random data */
int buffer[BUFFER_SIZE];
fill_random_buffer(buffer, BUFFER_SIZE);
int pivot = MAX_RANDOM / 2;
/* Initialize and run */
DPU_ASSERT(dpu_alloc(NB_DPUS, "sgXferEnable=true", &set));
DPU_ASSERT(dpu_load(set, DPU_BINARY, NULL));
populate_mram(set, buffer);
dpu_broadcast_to(set, "pivot", 0, &pivot, sizeof(pivot), DPU_XFER_DEFAULT);
DPU_ASSERT(dpu_launch(set, DPU_SYNCHRONOUS));
/* Retrieve metadata and compute length of the left partition. */
size_t **metadata = get_metadata(set, NB_DPUS);
size_t lower_length = get_left_length(metadata);
/* Compute where to store the incoming blocks. */
uint8_t ***block_addresses =
bidimensional_malloc(NB_DPUS, NB_BLOCKS, sizeof(uint8_t *));
compute_block_addresses(metadata, block_addresses, buffer, lower_length);
/* Retrieve the result. */
sg_xfer_context sc_args = {.metadata = metadata, .block_addresses = block_addresses};
get_block_t get_block_info = {.f = &get_block, .args = &sc_args, .args_size = sizeof(sc_args)};
DPU_ASSERT(dpu_push_sg_xfer(set, DPU_XFER_FROM_DPU, "buffer", 0,
BUFFER_SIZE / NB_DPUS * sizeof(int),
&get_block_info, DPU_SG_XFER_DEFAULT));
/* Validate the results. */
validate_partition(pivot, buffer, lower_length);
bidimensional_free(metadata, NB_DPUS);
bidimensional_free(block_addresses, NB_DPUS);
DPU_ASSERT(dpu_free(set));
return 0;
}
#include <algorithm>
#include <cassert>
#include <cstddef>
#include <cstdint>
#include <dpu>
#include <iostream>
#include <numeric>
#include <random>
#include <vector>
using dpu::DpuError;
using dpu::DpuSet;
using dpu::DpuSetOps;
#ifndef DPU_BINARY
#define DPU_BINARY "pivot_example"
#endif
/* The size of the buffer that we want to pivot: 1KByte. */
constexpr size_t BUFFER_SIZE(1 << 8);
/* Number of DPUs to use. */
constexpr size_t NB_DPUS(4);
/* Range of the random numbers. */
template <typename T> constexpr T MAX_RANDOM(1000);
/* Number of blocks */
constexpr size_t
NB_BLOCKS(2); // we're going to partition the buffer in 2 blocks
/* Generate a vector with random numbers between 1 and MAX_RANDOM. */
template <typename T> static auto generate_random_vector(size_t size) {
constexpr size_t seed = 42;
std::default_random_engine e1(seed);
std::uniform_int_distribution<T> uniform_dist(1, MAX_RANDOM<T>);
std::vector<T> v(size);
std::generate(v.begin(), v.end(), [&]() { return uniform_dist(e1); });
return v;
}
template <typename T>
static auto populate_mram(DpuSet &dpuSet, const std::vector<T> &random_buffer) {
auto nb_dpus = dpuSet.dpus().size();
std::vector<std::vector<T>> buffer(nb_dpus);
auto buffer_size = random_buffer.size() / nb_dpus;
for (auto i = 0; i < NB_DPUS; ++i) {
buffer[i] = std::vector<T>(random_buffer.begin() + i * buffer_size,
random_buffer.begin() + (i + 1) * buffer_size);
}
dpuSet.copy("buffer", buffer, buffer_size * sizeof(T));
}
static auto get_metadata(DpuSetOps &dpu) {
std::vector<std::vector<size_t>> metadata(NB_DPUS,
std::vector<size_t>(NB_BLOCKS));
dpu.copy(metadata, "metadata");
return metadata;
}
static auto get_left_length(std::vector<std::vector<size_t>> &metadata) {
constexpr size_t initial_value = 0;
auto left_length = std::accumulate(
metadata.begin(), metadata.end(), initial_value,
[](size_t acc, std::vector<size_t> &v) { return acc + v[0]; });
return left_length;
}
/* Compute the addresses of inbound blocks in the output buffer. */
template <typename T>
static auto compute_block_addresses(
const std::vector<std::vector<size_t>>
&metadata, /* [in] array of block lengths */
std::vector<T> &out_vector, /* [in] output vector */
const size_t lower_length /* [in] length of the lower partition */
) {
std::vector<std::vector<uint8_t *>> block_addresses(NB_DPUS);
auto start_indices = {0UL, lower_length};
for (auto i : start_indices) {
block_addresses[0].push_back(
reinterpret_cast<uint8_t *>(&out_vector.data()[i]));
}
std::transform(
block_addresses.cbegin(), block_addresses.cend() - 1, metadata.cbegin(),
block_addresses.begin() + 1,
[](auto &addr, auto &meta) {
auto next_addr = std::vector<uint8_t *>(NB_BLOCKS);
std::transform(addr.cbegin(), addr.cend(), meta.cbegin(),
next_addr.begin(),
[](uint8_t *addr, size_t length) {
return addr + length * sizeof(T);
});
return next_addr;
});
return block_addresses;
}
/* User structure that stores the get_block function arguments */
struct sg_xfer_context {
const std::vector<std::vector<size_t>>
&metadata; /* [in] array of block lengths */
const std::vector<std::vector<uint8_t *>>
&block_addresses; /* [in] array of
block addresses */
};
/* Callback function that returns the block information for a given DPU and
* block index. */
template <typename T>
static auto get_block(struct sg_block_info *out, uint32_t dpu_index,
uint32_t block_index, void *args) {
if (block_index >= NB_BLOCKS) {
return false;
}
/* Unpack the arguments */
auto *sc_args = reinterpret_cast<sg_xfer_context *>(args);
auto length = sc_args->metadata[dpu_index][block_index];
/* Set the output block */
out->length = length * sizeof(T);
out->addr = sc_args->block_addresses[dpu_index][block_index];
return true;
}
/* Validate the partition. */
template <typename T>
static auto validate_partition(T pivot, const std::vector<T> &buffer,
size_t lower_length) {
assert(*(std::max_element(buffer.begin(), buffer.begin() + lower_length)) <=
pivot);
assert(*(std::min_element(buffer.begin() + lower_length, buffer.end())) >
pivot);
std::cout << "Succeeded with lower_length = " << lower_length << std::endl;
}
auto main() -> int {
try {
using T = int;
/* Generate random data */
auto buffer = generate_random_vector<T>(BUFFER_SIZE);
std::vector<T> pivot{MAX_RANDOM<T> / 2};
/* Initialize and run */
auto dpuSet = DpuSet::allocate(NB_DPUS, "sgXferEnable=true");
dpuSet.load(DPU_BINARY);
populate_mram(dpuSet, buffer);
dpuSet.copy("pivot", pivot);
dpuSet.exec();
/* Retrieve metadata and compute length of the left partition. */
auto metadata = get_metadata(dpuSet);
auto lower_length = get_left_length(metadata);
/* Compute the addresses of inbound blocks in the output buffer. */
auto block_addresses =
compute_block_addresses(metadata, buffer, lower_length);
/* Retrieve the result. */
sg_xfer_context sc_args{metadata, block_addresses};
get_block_t get_block_info{get_block<T>, &sc_args, sizeof(sc_args)};
dpuSet.copyScatterGather(get_block_info, BUFFER_SIZE / NB_DPUS * sizeof(T),
"buffer");
/* Validate the result. */
validate_partition(pivot[0], buffer, lower_length);
} catch (const DpuError &e) {
std::cerr << e.what() << std::endl;
}
return 0;
}
#include <algorithm>
#include <cassert>
#include <cstddef>
#include <cstdint>
#include <dpu>
#include <functional>
#include <iostream>
#include <memory>
#include <numeric>
#include <random>
#include <vector>
using dpu::DpuError;
using dpu::DpuSet;
using dpu::DpuSetOps;
#ifndef DPU_BINARY
#define DPU_BINARY "pivot_example"
#endif
/* The size of the buffer that we want to pivot: 1KByte. */
constexpr size_t BUFFER_SIZE(1 << 8);
/* Number of DPUs to use. */
constexpr size_t NB_DPUS(4);
/* Range of the random numbers. */
template <typename T> constexpr T MAX_RANDOM(1000);
/* Number of blocks */
constexpr size_t
NB_BLOCKS(2); // we're going to partition the buffer in 2 blocks
/* Generate a vector with random numbers between 1 and MAX_RANDOM. */
template <typename T> static auto generate_random_vector(size_t size) {
constexpr size_t seed = 42;
std::default_random_engine e1(seed);
std::uniform_int_distribution<T> uniform_dist(1, MAX_RANDOM<T>);
std::vector<T> v(size);
std::generate(v.begin(), v.end(), [&]() { return uniform_dist(e1); });
return v;
}
template <typename T>
static auto populate_mram(DpuSet &dpuSet, const std::vector<T> &random_buffer) {
auto nb_dpus = dpuSet.dpus().size();
std::vector<std::vector<T>> buffer(nb_dpus);
auto buffer_size = random_buffer.size() / nb_dpus;
for (auto i = 0; i < NB_DPUS; ++i) {
buffer[i] = std::vector<T>(random_buffer.begin() + i * buffer_size,
random_buffer.begin() + (i + 1) * buffer_size);
}
dpuSet.copy("buffer", buffer, buffer_size * sizeof(T));
}
static auto get_metadata(DpuSetOps &dpu) {
std::vector<std::vector<size_t>> metadata(NB_DPUS,
std::vector<size_t>(NB_BLOCKS));
dpu.copy(metadata, "metadata");
return metadata;
}
static auto get_left_length(std::vector<std::vector<size_t>> &metadata) {
constexpr size_t initial_value = 0;
auto left_length = std::accumulate(
metadata.begin(), metadata.end(), initial_value,
[](size_t acc, std::vector<size_t> &v) { return acc + v[0]; });
return left_length;
}
/* Compute the addresses of inbound blocks in the output buffer. */
template <typename T>
static auto compute_block_addresses(
const std::vector<std::vector<size_t>>
&metadata, /* [in] array of block lengths */
std::vector<T> &out_vector, /* [in] output vector */
const size_t lower_length /* [in] length of the lower partition */
) {
std::vector<std::vector<uint8_t *>> block_addresses(NB_DPUS);
auto start_indices = {0UL, lower_length};
for (auto i : start_indices) {
block_addresses[0].push_back(
reinterpret_cast<uint8_t *>(&out_vector.data()[i]));
}
std::transform(
block_addresses.cbegin(), block_addresses.cend() - 1, metadata.cbegin(),
block_addresses.begin() + 1,
[](auto &addr, auto &meta) {
auto next_addr = std::vector<uint8_t *>(NB_BLOCKS);
std::transform(addr.cbegin(), addr.cend(), meta.cbegin(),
next_addr.begin(),
[](uint8_t *addr, size_t length) {
return addr + length * sizeof(T);
});
return next_addr;
});
return block_addresses;
}
/* Validate the partition. */
template <typename T>
static auto validate_partition(T pivot, const std::vector<T> &buffer,
size_t lower_length) {
assert(*(std::max_element(buffer.begin(), buffer.begin() + lower_length)) <=
pivot);
assert(*(std::min_element(buffer.begin() + lower_length, buffer.end())) >
pivot);
std::cout << "Succeeded with lower_length = " << lower_length << std::endl;
}
auto main() -> int {
try {
using T = int;
/* Generate random data */
auto buffer = generate_random_vector<T>(BUFFER_SIZE);
std::vector<T> pivot{MAX_RANDOM<T> / 2};
/* Initialize and run */
auto dpuSet = DpuSet::allocate(NB_DPUS, "sgXferEnable=true");
dpuSet.load(DPU_BINARY);
populate_mram(dpuSet, buffer);
dpuSet.copy("pivot", pivot);
dpuSet.exec();
/* Retrieve metadata and compute length of the left partition. */
auto metadata = get_metadata(dpuSet);
auto lower_length = get_left_length(metadata);
/* Compute the addresses of inbound blocks in the output buffer. */
auto block_addresses =
compute_block_addresses(metadata, buffer, lower_length);
/* Retrieve the result. */
auto get_block = [&metadata, &block_addresses](struct sg_block_info *out,
uint32_t dpu_index,
uint32_t block_index) {
if (block_index >= NB_BLOCKS) {
return false;
}
/* Set the output block */
out->length = metadata[dpu_index][block_index] * sizeof(T);
out->addr = block_addresses[dpu_index][block_index];
return true;
};
dpuSet.copyScatterGather(get_block, BUFFER_SIZE / NB_DPUS * sizeof(T),
"buffer");
/* Validate the result. */
validate_partition(pivot[0], buffer, lower_length);
} catch (const DpuError &e) {
std::cerr << e.what() << std::endl;
}
return 0;
}
It can be compiled with:
gcc -O3 --std=c99 -o pivot_example_host pivot_example_host.c -g `dpu-pkg-config --cflags --libs dpu`
g++ -O3 -o pivot_example_host_cpp pivot_example_host.cpp -g `dpu-pkg-config --cflags --libs dpu`
g++ -O3 -o pivot_example_host_lambda_cpp pivot_example_lambda_host.cpp -g `dpu-pkg-config --cflags --libs dpu`