Coding Analysis and Integrated Learning Experiments Augmented with Different Gossipy and Privacy Processes

In this tutorial, we examine how clustered learning behaves when the central clustering server is removed and replaced with a peer-to-peer distributed gossip mechanism. We implement both centralized FedAvg and Decentralized Gossip Federated Learning from scratch and introduce client-side partition privacy by injecting limited noise into local model updates. Using controlled experiments on non-IID MNIST data, we examine how the strength of encryption, as measured by different epsilon values, directly affects the convergence speed, stability, and accuracy of the final model. Also, we study the practical trade-off between privacy guarantees and learning success in real-world distributed learning systems. Check it out Full Codes here.
import os, math, random, time
from dataclasses import dataclass
from typing import Dict, List, Tuple
import subprocess, sys
def pip_install(pkgs):
subprocess.check_call([sys.executable, "-m", "pip", "install", "-q"] + pkgs)
pip_install(["torch", "torchvision", "numpy", "matplotlib", "networkx", "tqdm"])
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Subset
from torchvision import datasets, transforms
import matplotlib.pyplot as plt
import networkx as nx
from tqdm import trange
SEED = 7
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)
torch.backends.cudnn.deterministic = False
torch.backends.cudnn.benchmark = True
transform = transforms.Compose([transforms.ToTensor()])
train_ds = datasets.MNIST(root="/content/data", train=True, download=True, transform=transform)
test_ds = datasets.MNIST(root="/content/data", train=False, download=True, transform=transform)
We set up the workspace and installed all the necessary dependencies. We implement random seeds and device settings to maintain reproducibility in all tests. We also upload the MNIST dataset, which serves as a lightweight but effective benchmark for hybrid learning tests. Check it out Full Codes here.
def make_noniid_clients(dataset, num_clients=20, shards_per_client=2, seed=SEED):
rng = np.random.default_rng(seed)
y = np.array([dataset[i][1] for i in range(len(dataset))])
idx = np.arange(len(dataset))
idx_sorted = idx[np.argsort(y)]
num_shards = num_clients * shards_per_client
shard_size = len(dataset) // num_shards
shards = [idx_sorted[i*shard_size:(i+1)*shard_size] for i in range(num_shards)]
rng.shuffle(shards)
client_indices = []
for c in range(num_clients):
take = shards[c*shards_per_client:(c+1)*shards_per_client]
client_indices.append(np.concatenate(take))
return client_indices
NUM_CLIENTS = 20
client_indices = make_noniid_clients(train_ds, num_clients=NUM_CLIENTS, shards_per_client=2)
test_loader = DataLoader(test_ds, batch_size=1024, shuffle=False, num_workers=2, pin_memory=True)
class MLP(nn.Module):
def __init__(self):
super().__init__()
self.fc1 = nn.Linear(28*28, 256)
self.fc2 = nn.Linear(256, 128)
self.fc3 = nn.Linear(128, 10)
def forward(self, x):
x = x.view(x.size(0), -1)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
return self.fc3(x)
We construct a non-IID data distribution by partitioning the training dataset into label-based charts across multiple clients. We describe a convolutional neural network model that balances expressiveness and computational efficiency. It enables us to realistically simulate data heterogeneity, a key challenge in convergent learning systems. Check it out Full Codes here.
def get_model_params(model):
return {k: v.detach().clone() for k, v in model.state_dict().items()}
def set_model_params(model, params):
model.load_state_dict(params, strict=True)
def add_params(a, b):
return {k: a[k] + b[k] for k in a.keys()}
def sub_params(a, b):
return {k: a[k] - b[k] for k in a.keys()}
def scale_params(a, s):
return {k: a[k] * s for k in a.keys()}
def mean_params(params_list):
out = {k: torch.zeros_like(params_list[0][k]) for k in params_list[0].keys()}
for p in params_list:
for k in out.keys():
out[k] += p[k]
for k in out.keys():
out[k] /= len(params_list)
return out
def l2_norm_params(delta):
sq = 0.0
for v in delta.values():
sq += float(torch.sum(v.float() * v.float()).item())
return math.sqrt(sq)
def dp_sanitize_update(delta, clip_norm, epsilon, delta_dp, rng):
norm = l2_norm_params(delta)
scale = min(1.0, clip_norm / (norm + 1e-12))
clipped = scale_params(delta, scale)
if epsilon is None or math.isinf(epsilon) or epsilon <= 0:
return clipped
sigma = clip_norm * math.sqrt(2.0 * math.log(1.25 / delta_dp)) / epsilon
noised = {}
for k, v in clipped.items():
noise = torch.normal(mean=0.0, std=sigma, size=v.shape, generator=rng, device=v.device, dtype=v.dtype)
noised[k] = v + noise
return noised
We use parameter manipulation tools that allow the addition, subtraction, scaling, and averaging of model weights across clients. We introduce different privacy by cropping local updates and applying Gaussian noise, both determined by the chosen privacy budget. It serves as a core privacy mechanism that enables us to study privacy trade-offs and resources in both centralized and geographic settings. Check it out Full Codes here.
def local_train_one_client(base_params, client_id, epochs, lr, batch_size, weight_decay=0.0):
model = MLP().to(device)
set_model_params(model, base_params)
model.train()
loader = DataLoader(
Subset(train_ds, client_indices[client_id].tolist() if hasattr(client_indices[client_id], "tolist") else client_indices[client_id]),
batch_size=batch_size,
shuffle=True,
num_workers=2,
pin_memory=True
)
opt = torch.optim.SGD(model.parameters(), lr=lr, momentum=0.9, weight_decay=weight_decay)
for _ in range(epochs):
for xb, yb in loader:
xb, yb = xb.to(device), yb.to(device)
opt.zero_grad(set_to_none=True)
logits = model(xb)
loss = F.cross_entropy(logits, yb)
loss.backward()
opt.step()
return get_model_params(model)
@torch.no_grad()
def evaluate(params):
model = MLP().to(device)
set_model_params(model, params)
model.eval()
total, correct = 0, 0
loss_sum = 0.0
for xb, yb in test_loader:
xb, yb = xb.to(device), yb.to(device)
logits = model(xb)
loss = F.cross_entropy(logits, yb, reduction="sum")
loss_sum += float(loss.item())
pred = torch.argmax(logits, dim=1)
correct += int((pred == yb).sum().item())
total += int(yb.numel())
return loss_sum / total, correct / total
We define a local training loop that each client independently implements on their private data. We also use an integrated test routine to estimate the test loss and accuracy of any model condition. Together, these activities simulate collaborative learning behavior where training and testing are fully decoupled from data ownership. Check it out Full Codes here.
@dataclass
class FedAvgConfig:
rounds: int = 25
clients_per_round: int = 10
local_epochs: int = 1
lr: float = 0.06
batch_size: int = 64
clip_norm: float = 2.0
epsilon: float = math.inf
delta_dp: float = 1e-5
def run_fedavg(cfg):
global_params = get_model_params(MLP().to(device))
history = {"test_loss": [], "test_acc": []}
for r in trange(cfg.rounds):
chosen = random.sample(range(NUM_CLIENTS), k=cfg.clients_per_round)
start_params = global_params
updates = []
for cid in chosen:
local_params = local_train_one_client(start_params, cid, cfg.local_epochs, cfg.lr, cfg.batch_size)
delta = sub_params(local_params, start_params)
rng = torch.Generator(device=device)
rng.manual_seed(SEED * 10000 + r * 100 + cid)
delta_dp = dp_sanitize_update(delta, cfg.clip_norm, cfg.epsilon, cfg.delta_dp, rng)
updates.append(delta_dp)
avg_update = mean_params(updates)
global_params = add_params(start_params, avg_update)
tl, ta = evaluate(global_params)
history["test_loss"].append(tl)
history["test_acc"].append(ta)
return history, global_params
We use the centralized FedAvg algorithm, where a set of clients train locally and send private updates separately to a central aggregator. We track the performance of the models in all communication rounds to observe the convergence behavior under various privacy budgets. This serves as a baseline against which gossip-based learning is compared. Check it out Full Codes here.
@dataclass
class GossipConfig:
rounds: int = 25
local_epochs: int = 1
lr: float = 0.06
batch_size: int = 64
clip_norm: float = 2.0
epsilon: float = math.inf
delta_dp: float = 1e-5
topology: str = "ring"
p: float = 0.2
gossip_pairs_per_round: int = 10
def build_topology(cfg):
if cfg.topology == "ring":
G = nx.cycle_graph(NUM_CLIENTS)
elif cfg.topology == "erdos_renyi":
G = nx.erdos_renyi_graph(NUM_CLIENTS, cfg.p, seed=SEED)
if not nx.is_connected(G):
comps = list(nx.connected_components(G))
for i in range(len(comps) - 1):
a = next(iter(comps[i]))
b = next(iter(comps[i+1]))
G.add_edge(a, b)
else:
raise ValueError
return G
def run_gossip(cfg):
node_params = [get_model_params(MLP().to(device)) for _ in range(NUM_CLIENTS)]
G = build_topology(cfg)
history = {"avg_test_loss": [], "avg_test_acc": []}
for r in trange(cfg.rounds):
new_params = []
for cid in range(NUM_CLIENTS):
p0 = node_params[cid]
p_local = local_train_one_client(p0, cid, cfg.local_epochs, cfg.lr, cfg.batch_size)
delta = sub_params(p_local, p0)
rng = torch.Generator(device=device)
rng.manual_seed(SEED * 10000 + r * 100 + cid)
delta_dp = dp_sanitize_update(delta, cfg.clip_norm, cfg.epsilon, cfg.delta_dp, rng)
p_local_dp = add_params(p0, delta_dp)
new_params.append(p_local_dp)
node_params = new_params
edges = list(G.edges())
for _ in range(cfg.gossip_pairs_per_round):
i, j = random.choice(edges)
avg = mean_params([node_params[i], node_params[j]])
node_params[i] = avg
node_params[j] = avg
losses, accs = [], []
for cid in range(NUM_CLIENTS):
tl, ta = evaluate(node_params[cid])
losses.append(tl)
accs.append(ta)
history["avg_test_loss"].append(float(np.mean(losses)))
history["avg_test_acc"].append(float(np.mean(accs)))
return history, node_params
We implement decentralized Gossip Federated Learning using a peer-to-peer exchange model with a predefined network topology. We simulate local iterative training and parameter matching without relying on a central server. It allows us to analyze how privacy noise spreads through decentralized communication patterns and affects convergence. Check it out Full Codes here.
eps_sweep = [math.inf, 8.0, 4.0, 2.0, 1.0]
ROUNDS = 20
fedavg_results = {}
gossip_results = {}
common_local_epochs = 1
common_lr = 0.06
common_bs = 64
common_clip = 2.0
common_delta = 1e-5
for eps in eps_sweep:
fcfg = FedAvgConfig(
rounds=ROUNDS,
clients_per_round=10,
local_epochs=common_local_epochs,
lr=common_lr,
batch_size=common_bs,
clip_norm=common_clip,
epsilon=eps,
delta_dp=common_delta
)
hist_f, _ = run_fedavg(fcfg)
fedavg_results[eps] = hist_f
gcfg = GossipConfig(
rounds=ROUNDS,
local_epochs=common_local_epochs,
lr=common_lr,
batch_size=common_bs,
clip_norm=common_clip,
epsilon=eps,
delta_dp=common_delta,
topology="ring",
gossip_pairs_per_round=10
)
hist_g, _ = run_gossip(gcfg)
gossip_results[eps] = hist_g
plt.figure(figsize=(10, 5))
for eps in eps_sweep:
plt.plot(fedavg_results[eps]["test_acc"], label=f"FedAvg eps={eps}")
plt.xlabel("Round")
plt.ylabel("Accuracy")
plt.legend()
plt.grid(True)
plt.show()
plt.figure(figsize=(10, 5))
for eps in eps_sweep:
plt.plot(gossip_results[eps]["avg_test_acc"], label=f"Gossip eps={eps}")
plt.xlabel("Round")
plt.ylabel("Avg Accuracy")
plt.legend()
plt.grid(True)
plt.show()
final_fed = [fedavg_results[eps]["test_acc"][-1] for eps in eps_sweep]
final_gos = [gossip_results[eps]["avg_test_acc"][-1] for eps in eps_sweep]
x = [100.0 if math.isinf(eps) else eps for eps in eps_sweep]
plt.figure(figsize=(8, 5))
plt.plot(x, final_fed, marker="o", label="FedAvg")
plt.plot(x, final_gos, marker="o", label="Gossip")
plt.xlabel("Epsilon")
plt.ylabel("Final Accuracy")
plt.legend()
plt.grid(True)
plt.show()
def rounds_to_threshold(acc_curve, threshold):
for i, a in enumerate(acc_curve):
if a >= threshold:
return i + 1
return None
best_f = fedavg_results[math.inf]["test_acc"][-1]
best_g = gossip_results[math.inf]["avg_test_acc"][-1]
th_f = 0.9 * best_f
th_g = 0.9 * best_g
for eps in eps_sweep:
rf = rounds_to_threshold(fedavg_results[eps]["test_acc"], th_f)
rg = rounds_to_threshold(gossip_results[eps]["avg_test_acc"], th_g)
print(eps, rf, rg)
We conduct controlled trials across multiple privacy levels and collect results for both centralized and centralized training strategies. We visualize convergence trends and ultimate accuracy to clearly expose the privacy-utility trade-off. We also calculate convergence speed metrics to quantitatively compare how different convergence schemes respond to increasing privacy constraints.
In conclusion, we have shown that localization fundamentally changes how different privacy noise propagates through a compact system. We observed that while averaged FedAvg generally converges quickly under weak privacy constraints, gossip-based ensemble learning is more robust to noisy updates at the expense of slower convergence. Our experiments highlighted that strong privacy ensures very slow learning in both settings, but the effect is magnified in distributed topologies due to information mixing delays. Overall, we have shown that designing privacy-preserving integrated systems requires thinking collaboratively about integrated topology, communication patterns, and privacy budgets rather than considering them as independent options.
Check it out Full Codes here. Also, feel free to follow us Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to Our newspaper. Wait! are you on telegram? now you can join us on telegram too.



