Creating a New Algorithm
In this section the basic steps for creating an algorithm for horizontal partitioned data are explained.
The final code of this tutorial is published on Github. The algorithm is also published in our Docker registry: harbor2.vantage6.ai/demo/average
It is assumed that it is mathematically possible to create a federated version of the algorithm you want to use. In the following sections we create a federated algorithm to compute the average of a distributed dataset. An overview of the steps that we are going though:
    1.
    Mathematically decompose the model
    2.
    Implement and test locally
    3.
    Vantage6 algorithm wrapper
    4.
    Dockerize and push to a registry

πŸ“ The mathematical problem

We want to now the average of column X from a dataset Q which contains n samples. Dataset Q is horizontally partitioned in dataset
A=[a1,a2...aj]=[q1,q2...qj]A = [a_1, a_2 ... a_j] = [q_1, q_2 ... q_j]
and
B=[b1,b2...bk]=[qj+1,qj+2...qn]B = [b_{1}, b_{2} ... b_k] = [q_{j+1}, q_{j+2}...q_{n}]
. The average of dataset Q is computed as:
Qmean=1nβˆ‘i=1nqi=q1+q2+...+qnnQ_{mean} = \frac{1}{n} \sum \limits_{i=1}^{n} {q_i} = \frac{q_1 + q_2 + ... + q_n}{n}
Now we would like to compute
QmeanQ_{mean}
from dataset A and B. This could be computed as:
Qmean=(a1+a2+...+aj)+(b1+b2+...+bk)j+k=βˆ‘A+βˆ‘Bj+kQ_{mean} = \frac{(a_1+a_2+...+a_j) + (b_1+b_2+...+b_k)}{j+k} = \frac{\sum A + \sum B }{j+k}
We both need to count the number of samples in each dataset and we need the total sum of each dataset. Then we can compute the global average of dataset A and B.
We cannot simply compute the average on each node and combine them, as this would be mathematically incorrect. This would only work in the case if dataset A and B contain the same number of samples.

πŸ‘¨πŸ’» Implementation

In this examples we use python, however you are free to choose any language. The only requirements are: 1) It needs to be able to create HTTP-requests, and 2) and needs to be able to read and write to files.
However is you use a different language you are not able to use our wrapper. Reach us on Discord to discuss how this works.
Now that we have figured out the maths, we can translate it to an implementation. A federated algorithm consist of two parts:
    1.
    Central part of the algorithm which is responsible for combining the partial results from the data station. In our case that would be dividing the sum of the totals with the sum of observations.
    2.
    Federated part of the algorithm which is responsible for the creating the partial results. In our case this would be computing the total (=sum) and number of observations.
The central part of the algorithm can either be run on the machine of the researcher himself or in a master container which runs on a node, the latter is the preferred method.
In case the researcher runs this part himself he needs to have a proper setup to do so (i.e. python 3.5+ and the necessary dependencies). This is useful when developing new algorithms.

πŸ’• Federated Part

The node that runs this part contains a CSV-file with one column numbers which we want to use to compute the global mean. We assume that this column has no NaN values.
1
import pandas
2
​
3
def federated_part(path, column_name="numbers"):
4
"""Compute the sum and number of observations of a column"""
5
​
6
# extract the column numbers from the CSV
7
numbers = pandas.read_csv(path)[column_name]
8
​
9
# compute the sum, and count number of rows
10
local_sum = numbers.sum()
11
local_count = len(numbers)
12
​
13
# return the values as a dict
14
return {
15
"sum": local_sum,
16
"count": local_count
17
}
Copied!

❀ The central algorithm

The central algorithm receives the sums and counts from all sites and combines these to a global mean. This could be from one or more sites.
1
def central_part(node_outputs):
2
"""Combine the partial results to a global average"""
3
global_sum = 0
4
global_count = 0
5
for output in node_outputs:
6
global_sum += output["sum"]
7
global_count += output["count"]
8
​
9
return {"average": global_sum / global_count}
Copied!

πŸ§ͺ Local testing

To test simple create two datasets A and B, both having a numerical column numbers. Then run the following:
1
outputs = [
2
federated_part("path/to/dataset/A"),
3
federated_part("path/to/dataset/B")
4
]
5
Q_average = central_part(outputs)["average"]
6
print(f"global average = {Q_average}.")
Copied!

🌯 Algorithm wrapper

A good starting point would be to use the boilerplate from our Github. This section gives background on all the steps needed to get to this boilerplate but also provides some background information.
Now that we have a federated implementation of our algorithm we need to incorporate it in the vantage6 infrastructure. The infrastructure handles the communication with the server and provides data access to the algorithm.
The algorithm consumes a file containing the input. This contains both the method name to be triggered as well as the arguments provided to the method. The algorithm has also access to a CSV file (in the future this could also be a database) on which the algorithm can run. Finally when the algorithm is finished it writes back the output to a different file.
The central part of the algorithm needs to be able to create (sub)tasks. These subtasks are responsible to execute the federated part of the algorithm. The central part of the algorithm can either be executed on the machine of the researcher or also on one of the nodes in the vantage6 network. In this example we only show the case in which one of the nodes executes the central part of the algorithm. The node provides the algorithm with a JWT token so that the central part of the algorithm has access to the server to post these subtasks.
In this example the node uses a CSV-file as database πŸ“”. There are implementations that use traditional databases and triple stores. We expect to support these use cases better in the future.

πŸ“‚Package Structure

The algorithm need to be structured as a package. This way the algorithm can be installed within the Docker image. The minimal file-structure would be:
1
project_folder
2
β”œβ”€β”€ Dockerfile
3
β”œβ”€β”€ setup.py
4
└── algorithm_pkg
5
└── __init__.py
Copied!
We also recommend adding a README.md, LICENSE and requirements.txt to the project_folder.

setup.py

Contains the setup method to create a package from your algorithm code. Here you specify some details about your package and the dependencies it requires.
1
from os import path
2
from codecs import open
3
from setuptools import setup, find_packages
4
​
5
# we're using a README.md, if you do not have this in your folder, simply
6
# replace this with a string.
7
here = path.abspath(path.dirname(__file__))
8
with open(path.join(here, 'README.md'), encoding='utf-8') as f:
9
long_description = f.read()
10
​
11
# Here you specify the meta-data of your package. The `name` argument is
12
# needed in some other steps.
13
setup(
14
name='v6-average-py',
15
version="1.0.0",
16
description='vantage6 average',
17
long_description=long_description,
18
long_description_content_type='text/markdown',
19
url='https://github.com/IKNL/v6-average-py',
20
packages=find_packages(),
21
python_requires='>=3.6',
22
install_requires=[
23
'vantage6-client',
24
# list your dependancies here:
25
# pandas, ...
26
]
27
)
Copied!
The setup.py above is sufficient in most cases. However if you want to do more advanced stuff (like adding static data, or a CLI) you can use the extra arguments from setup.

​
🐳
Dockerfile

Contains the recipe for building the Docker image. Typically you only have to change the argument PKG_NAME to the name of you package. This name should be the same as as the name you specified in the setup.py. In our case that would be v6-average-py.
1
# This specifies our base image. This base image contains some commonly used
2
# dependancies and an install from all vantage6 packages. You can specify a
3
# different image here (e.g. python:3). In that case it is important that
4
# `vantage6-client` is a dependancy of you project as this contains the wrapper
5
# we are using in this example.
6
FROM harbor.vantage6.ai/algorithms/algorithm-base
7
​
8
# Change this to the package name of your project. This needs to be the same
9
# as what you specified for the name in the `setup.py`.
10
ARG PKG_NAME="v6-average-py"
11
​
12
# This will install your algorithm into this image.
13
COPY . /app
14
RUN pip install /app
15
​
16
# This will run your algorithm when the Docker container is started. The
17
# wrapper takes care of the IO handling (communication between node and
18
# algorithm). You dont need to change anything here.
19
ENV PKG_NAME=${PKG_NAME}
20
CMD python -c "from vantage6.tools.docker_wrapper import docker_wrapper; docker_wrapper('${PKG_NAME}')"
Copied!

__init__.py

This contains the code for your algorithm. It is possible to split this into multiple files, however the methods that should be available to the researcher should be in this file. You can do that by simply importing them into this file (e.g. from .average import my_nested_method)
We can distinguish two types of methods that a user can trigger:
name
description
prefix
arguments
master
Central part of the algorithm. Recieves a client as argument which provides an interface to the central server. This way the master can create tasks and collect their results.
​
(client, data, *args, **kwargs)
Remote procedure call
Consumes the data at the node to compute the partial.
RPC_
(data, *args, **kwargs)
The client the master method receives is a ContainerClient which is different than the client you use as a user.
Everything that is behind a return statement is send back to the central server. This should never contain any privacy sensitive information
For our average algorithm the implementation will look as follows:
1
import time
2
​
3
from vantage6.tools.util import info
4
​
5
def master(client, data, column_name):
6
"""Combine partials to global model
7
​
8
First we collect the parties that participate in the collaboration.
9
Then we send a task to all the parties to compute their partial (the
10
row count and the column sum). Then we wait for the results to be
11
ready. Finally when the results are ready, we combine them to a
12
global average.
13
​
14
Note that the master method also receives the (local) data of the
15
node. In most usecases this data argument is not used.
16
​
17
The client, provided in the first argument, gives an interface to
18
the central server. This is needed to create tasks (for the partial
19
results) and collect their results later on. Note that this client
20
is a different client than the client you use as a user.
21
"""
22
​
23
# Info messages can help you when an algorithm crashes. These info
24
# messages are stored in a log file which is send to the server when
25
# either a task finished or crashes.
26
info('Collecting participating organizations')
27
​
28
# Collect all organization that participate in this collaboration.
29
# These organizations will receive the task to compute the partial.
30
organizations = client.get_organizations_in_my_collaboration()
31
ids = [organization.get("id") for organization in organizations]
32
​
33
# Request all participating parties to compute their partial. This
34
# will create a new task at the central server for them to pick up.
35
# We've used a kwarg but is is also possible to use `args`. Although
36
# we prefer kwargs as it is clearer.
37
info('Requesting partial computation')
38
task = client.create_new_task(
39
input_={
40
'method': 'average_partial',
41
'kwargs': {
42
'column_name': column_name
43
}
44
},
45
organization_ids=ids
46
)
47
​
48
# Now we need to wait untill all organizations(/nodes) finished
49
# their partial. We do this by polling the server for results. It is
50
# also possible to subscribe to a websocket channel to get status
51
# updates.
52
info("Waiting for resuls")
53
task_id = task.get("id")
54
task = client.get_task(task_id)
55
while not task.get("complete"):
56
task = client.get_task(task_id)
57
info("Waiting for results")
58
time.sleep(1)
59
​
60
# Once we now the partials are complete, we can collect them.
61
info("Obtaining results")
62
results = client.get_results(task_id=task.get("id"))
63
​
64
# Now we can combine the partials to a global average.
65
global_sum = 0
66
global_count = 0
67
for result in results:
68
global_sum += result["sum"]
69
global_count += result["count"]
70
​
71
return {"average": global_sum / global_count}
72
​
73
def RPC_average_partial(data, column_name):
74
"""Compute the average partial
75
​
76
The data argument contains a pandas-dataframe containing the local
77
data from the node.
78
"""
79
​
80
# extract the column_name from the dataframe.
81
info(f'Extracting column {column_name}')
82
numbers = data[column_name]
83
​
84
# compute the sum, and count number of rows
85
info('Computing partials')
86
local_sum = numbers.sum()
87
local_count = len(numbers)
88
​
89
# return the values as a dict
90
return {
91
"sum": local_sum,
92
"count": local_count
93
}
Copied!

​
🏑
Local testing

Now that we have a vantage6 implementation of the algorithm it is time to test it. Before we run it into a vantage6 setup we can test it locally by using the ClientMockProtocol which simulates the communication with the central server.
Before we can locally test it we need to (editable) install the algorithm package so that the Mock client can use it. Simply move to the root directory of your algorithm package (with the setup.py file) and run the following:
1
pip install -e .
Copied!
Then create a script to test the algorithm:
1
from vantage6.tools.mock_client import ClientMockProtocol
2
​
3
# Initialize the mock server. The datasets simulate the local datasets from
4
# the node. In this case we have two parties having two different datasets:
5
# a.csv and b.csv. The module name needs to be the name of your algorithm
6
# package. This is the name you specified in `setup.py`, in our case that
7
# would be v6-average-py.
8
client = ClientMockProtocol(
9
datasets=["local/a.csv", "local/b.csv"],
10
module="v6-average-py"
11
)
12
​
13
# to inspect which organization are in your mock client, you can run the
14
# following
15
organizations = client.get_organizations_in_my_collaboration()
16
org_ids = ids = [organization["id"] for organization in organizations]
17
​
18
# we can either test a RPC method or the master method (which will trigger the
19
# RPC methods also). Lets start by triggering an RPC method and see if that
20
# works. Note that we do *not* specify the RPC_ prefix for the method! In this
21
# example we assume that both a.csv and b.csv contain a numerical column `age`.
22
average_partial_task = client.create_new_task(
23
input_={
24
'method':'average_partial',
25
'kwargs': {
26
'column_name': 'age'
27
}
28
},
29
organization_ids=org_ids
30
)
31
​
32
# You can directly obtain the result (we dont have to wait for nodes to
33
# complete the tasks)
34
results = client.get_results(average_partial_task.get("id"))
35
print(results)
36
​
37
# To trigger the master method you also need to supply the `master`-flag
38
# to the input. Also note that we only supply the task to a single organization
39
# as we only want to execute the central part of the algorithm once. The master
40
# task takes care of the distribution to the other parties.
41
average_task = client.create_new_task(
42
input_={
43
'master': 1,
44
'method':'master',
45
'kwargs': {
46
'column_name': 'age'
47
}
48
},
49
organization_ids=[org_ids[0]]
50
)
51
results = client.get_results(average_task.get("id"))
52
print(results)
Copied!

​
πŸ—
Building and
πŸš›
Distributing

Now that we have a full tested algorithm for the vantage6 infrastructure. We need to package it so that it can be distributed to the data-stations/nodes. Algorithms are delivered in Docker images. So that's where we need the Dockerfile for. To build an image from our algorithm (make sure you have docker installed and it's running) you can run the following command from the root directory of your algorithm project.
1
docker build -t harbor2.vantage6.ai/demo/average .
Copied!
The option -t specifies the (unique) identifier used by the researcher to use this algorithm. Usually this includes the registry address (harbor2.vantage6.ai) and the project name (demo).
In case you are using docker hub as registry, you do not have to specify the registry or project as these are set by default to the Docker hub and your docker hub username.
1
docker push harbor2.vantage6.ai/demo/average
Copied!
Reach out to us on Discord if you want to use our registries (harbor.vantage6.ai and harbor2.vantage6.ai).

​
🀞
Cross-language serialization

It is possible that a vantage6 algorithm is developed in one programming language, but you would like to run the task from another language. For these kinds of usecases the python algorithm wrapper and client support cross-language serialization. By default input to the algorithms and output back to the client are serialized using pickle. However, it is possible to define a different serialization format.
Input and output serialization can be specified as follows:
1
client.post_task(
2
name='mytask',
3
image='harbor2.vantage6.ai/testing/v6-test-py',
4
collaboration_id=COLLABORATION_ID,
5
organization_ids=ORGANIZATION_IDS,
6
data_format='json', # Specify input format to the algorithm
7
input_={
8
'method': 'column_names',
9
'kwargs': {'data_format': 'json'}, # Specify output format
10
}
11
)
Copied!
Last modified 7mo ago