📡 Telemetry Protocol¶
🚀 Sending Strategy¶
Batching¶
┌──────────────────────────────────────┐ │ Events accumulate locally │ │ ↓ │ │ Batch every 5 minutes OR 100 events │ │ ↓ │ │ Compress (gzip) │ │ ↓ │ │ POST to telemetry endpoint │ └──────────────────────────────────────┘
Benefits: - Reduce network calls (battery friendly) - Lower bandwidth usage (cost efficient) - Better user experience (no constant network activity) - Resilient to temporary network issues
Implementation:
class TelemetryBatcher {
public:
TelemetryBatcher()
: batchSize_(100)
, batchIntervalSeconds_(300) // 5 minutes
{
startBatchTimer();
}
void enqueue(const TelemetryEvent& event) {
std::lock_guard<std::mutex> lock(mutex_);
batch_.push_back(event);
// Flush if batch is full
if (batch_.size() >= batchSize_) {
flush();
}
}
void flush() {
if (batch_.empty()) return;
// Create batch payload
nlohmann::json payload = {
{"batch_id", generateUUID()},
{"timestamp", getCurrentTimestamp()},
{"events", nlohmann::json::array()}
};
for (const auto& event : batch_) {
payload["events"].push_back(event.toJSON());
}
// Compress
std::string json_str = payload.dump();
std::vector<uint8_t> compressed = gzipCompress(json_str);
// Send
sendBatch(compressed);
// Clear batch
batch_.clear();
}
private:
std::vector<TelemetryEvent> batch_;
size_t batchSize_;
int batchIntervalSeconds_;
std::mutex mutex_;
void startBatchTimer() {
batchTimer_ = std::make_unique<Timer>(
std::chrono::seconds(batchIntervalSeconds_),
[this]() { flush(); }
);
}
void sendBatch(const std::vector<uint8_t>& compressed_data) {
httpClient_.post(
TELEMETRY_ENDPOINT,
compressed_data,
{
{"Content-Type", "application/gzip"},
{"Content-Encoding", "gzip"},
{"X-API-Key", API_KEY}
}
);
}
};
Compression¶
Compression Ratio:
Implementation:
std::vector<uint8_t> gzipCompress(const std::string& data) {
z_stream stream;
stream.zalloc = Z_NULL;
stream.zfree = Z_NULL;
stream.opaque = Z_NULL;
if (deflateInit2(&stream, Z_DEFAULT_COMPRESSION,
Z_DEFLATED, 15 + 16, 8,
Z_DEFAULT_STRATEGY) != Z_OK) {
throw std::runtime_error("Failed to initialize gzip");
}
stream.avail_in = data.size();
stream.next_in = reinterpret_cast<Bytef*>(const_cast<char*>(data.data()));
std::vector<uint8_t> compressed;
compressed.reserve(data.size());
int ret;
do {
uint8_t buffer[4096];
stream.avail_out = sizeof(buffer);
stream.next_out = buffer;
ret = deflate(&stream, Z_FINISH);
size_t compressed_size = sizeof(buffer) - stream.avail_out;
compressed.insert(compressed.end(), buffer, buffer + compressed_size);
} while (ret == Z_OK);
deflateEnd(&stream);
if (ret != Z_STREAM_END) {
throw std::runtime_error("Gzip compression failed");
}
return compressed;
}
🔄 Offline Queueing¶
Strategy:
class TelemetryQueue {
public:
void enqueue(const TelemetryEvent& event) {
// Add to in-memory queue
inMemoryQueue_.push_back(event);
// Persist to disk (SQLite)
persistEvent(event);
// Try to send if online
if (isOnline() && inMemoryQueue_.size() >= BATCH_SIZE) {
flush();
}
}
void flush() {
if (!isOnline()) {
LOG_DEBUG("Offline - events queued for later");
return;
}
auto events = loadPendingEvents();
if (events.empty()) return;
// Send batch
if (sendBatch(events)) {
// Success - delete from queue
deletePendingEvents(events);
} else {
// Failure - keep in queue, retry later
scheduleRetry();
}
}
private:
std::vector<TelemetryEvent> inMemoryQueue_;
std::unique_ptr<SQLite::Database> db_;
void persistEvent(const TelemetryEvent& event) {
SQLite::Statement query(
*db_,
"INSERT INTO telemetry_queue (event_json, timestamp) VALUES (?, ?)"
);
query.bind(1, event.toJSON().dump());
query.bind(2, event.timestamp);
query.exec();
}
std::vector<TelemetryEvent> loadPendingEvents(int limit = 100) {
SQLite::Statement query(
*db_,
"SELECT event_json FROM telemetry_queue ORDER BY timestamp LIMIT ?"
);
query.bind(1, limit);
std::vector<TelemetryEvent> events;
while (query.executeStep()) {
std::string json_str = query.getColumn(0).getString();
auto json = nlohmann::json::parse(json_str);
events.push_back(TelemetryEvent::fromJSON(json));
}
return events;
}
void deletePendingEvents(const std::vector<TelemetryEvent>& events) {
// Delete successfully sent events
for (const auto& event : events) {
SQLite::Statement query(
*db_,
"DELETE FROM telemetry_queue WHERE event_json = ?"
);
query.bind(1, event.toJSON().dump());
query.exec();
}
}
bool isOnline() {
// Quick connectivity check
return httpClient_.canConnect(TELEMETRY_ENDPOINT, std::chrono::seconds(1));
}
};
Queue Size Limits:
// Prevent queue from growing unbounded
void enforceQueueLimit() {
const size_t MAX_QUEUE_SIZE = 10000; // ~10 MB at 1 KB/event
const size_t MAX_AGE_DAYS = 30;
// 1. Delete oldest events if queue too large
SQLite::Statement count_query(*db_, "SELECT COUNT(*) FROM telemetry_queue");
count_query.executeStep();
size_t queue_size = count_query.getColumn(0).getInt();
if (queue_size > MAX_QUEUE_SIZE) {
size_t to_delete = queue_size - MAX_QUEUE_SIZE;
SQLite::Statement delete_query(
*db_,
"DELETE FROM telemetry_queue WHERE id IN "
"(SELECT id FROM telemetry_queue ORDER BY timestamp LIMIT ?)"
);
delete_query.bind(1, static_cast<int>(to_delete));
delete_query.exec();
LOG_WARNING("Telemetry queue overflow - deleted oldest events",
{{"deleted", to_delete}});
}
// 2. Delete events older than 30 days
auto cutoff = getCurrentTimestamp() - std::chrono::days(MAX_AGE_DAYS);
SQLite::Statement age_query(
*db_,
"DELETE FROM telemetry_queue WHERE timestamp < ?"
);
age_query.bind(1, cutoff);
age_query.exec();
}
⏱️ Retry Logic¶
Exponential Backoff:
class RetryStrategy {
public:
RetryStrategy()
: baseDelay_(std::chrono::seconds(1))
, maxDelay_(std::chrono::minutes(5))
, maxAttempts_(5)
, currentAttempt_(0)
{}
bool shouldRetry() const {
return currentAttempt_ < maxAttempts_;
}
std::chrono::seconds getNextDelay() {
if (currentAttempt_ == 0) {
return std::chrono::seconds(0); // Immediate first attempt
}
// Exponential backoff: 1s, 2s, 4s, 8s, 16s
auto delay = baseDelay_ * (1 << (currentAttempt_ - 1));
// Cap at max delay
return std::min(delay, maxDelay_);
}
void recordAttempt() {
++currentAttempt_;
}
void recordSuccess() {
currentAttempt_ = 0; // Reset on success
}
void recordFailure() {
recordAttempt();
}
private:
std::chrono::seconds baseDelay_;
std::chrono::seconds maxDelay_;
int maxAttempts_;
int currentAttempt_;
};
Retry Schedule:
Attempt 1: Immediate
Attempt 2: Wait 1 second
Attempt 3: Wait 2 seconds
Attempt 4: Wait 4 seconds
Attempt 5: Wait 8 seconds
Then: Discard (don't fill disk)
Implementation:
void sendBatchWithRetry(const std::vector<TelemetryEvent>& events) {
RetryStrategy retry;
while (retry.shouldRetry()) {
auto delay = retry.getNextDelay();
if (delay.count() > 0) {
LOG_DEBUG("Retrying telemetry send", {
{"attempt", retry.getCurrentAttempt()},
{"delay_seconds", delay.count()}
});
std::this_thread::sleep_for(delay);
}
try {
auto response = httpClient_.post(
TELEMETRY_ENDPOINT,
createBatchPayload(events)
);
if (response.statusCode == 200) {
retry.recordSuccess();
LOG_INFO("Telemetry batch sent", {
{"events", events.size()}
});
return; // Success!
} else {
retry.recordFailure();
LOG_WARNING("Telemetry send failed", {
{"status_code", response.statusCode},
{"attempt", retry.getCurrentAttempt()}
});
}
} catch (const std::exception& e) {
retry.recordFailure();
LOG_ERROR("Telemetry send exception", {
{"error", e.what()},
{"attempt", retry.getCurrentAttempt()}
});
}
}
// Max retries exceeded - discard events
LOG_WARNING("Telemetry batch discarded after max retries", {
{"events", events.size()}
});
}
📊 Bandwidth Limits¶
Target: < 100 KB/day per user
Actual: ~50 KB/day (batched + compressed)
Breakdown:
Average event size (JSON): 500 bytes
Average batch size: 100 events = 50 KB
Compression ratio: 80%
Compressed batch: 10 KB
Batches per day: ~5 (every 5 minutes while active)
Total bandwidth: 50 KB/day
Enforcement:
class BandwidthLimiter {
public:
BandwidthLimiter()
: dailyLimit_(100 * 1024) // 100 KB
, currentUsage_(0)
, lastReset_(std::chrono::system_clock::now())
{}
bool canSend(size_t bytes) {
resetIfNeeded();
if (currentUsage_ + bytes > dailyLimit_) {
LOG_WARNING("Telemetry bandwidth limit exceeded", {
{"current_usage", currentUsage_},
{"limit", dailyLimit_}
});
return false;
}
return true;
}
void recordUsage(size_t bytes) {
currentUsage_ += bytes;
}
private:
size_t dailyLimit_;
size_t currentUsage_;
std::chrono::system_clock::time_point lastReset_;
void resetIfNeeded() {
auto now = std::chrono::system_clock::now();
auto elapsed = std::chrono::duration_cast<std::chrono::hours>(
now - lastReset_
);
if (elapsed.count() >= 24) {
currentUsage_ = 0;
lastReset_ = now;
}
}
};
🔌 API Endpoints¶
POST /v1/events¶
Request:
POST /v1/events HTTP/1.1
Host: telemetry.audiolab.com
Content-Type: application/gzip
Content-Encoding: gzip
X-API-Key: your-api-key
X-Client-Version: 1.2.3
[gzipped JSON payload]
Payload (uncompressed):
{
"batch_id": "b9e4f7a2-...",
"timestamp": "2025-01-15T10:30:00Z",
"client_version": "1.2.3",
"events": [
{
"event": "plugin_loaded",
"timestamp": "2025-01-15T10:30:00Z",
// ... event data
},
{
"event": "feature_used",
"timestamp": "2025-01-15T10:35:00Z",
// ... event data
}
]
}
Response (Success):
HTTP/1.1 200 OK
Content-Type: application/json
{
"batch_id": "b9e4f7a2-...",
"received": 2,
"timestamp": "2025-01-15T10:30:01Z"
}
Response (Error):
HTTP/1.1 400 Bad Request
Content-Type: application/json
{
"error": "Invalid payload",
"details": "Missing required field: user_id"
}
POST /v1/telemetry/export¶
Request:
POST /v1/telemetry/export HTTP/1.1
Host: telemetry.audiolab.com
Content-Type: application/json
Authorization: Bearer user-token
{
"user_id": "a7f3c8e9d2b1..."
}
Response:
HTTP/1.1 200 OK
Content-Type: application/json
{
"user_id": "a7f3c8e9d2b1...",
"events": [
// All user's telemetry events
],
"export_timestamp": "2025-01-15T10:30:00Z"
}
POST /v1/telemetry/delete¶
Request:
POST /v1/telemetry/delete HTTP/1.1
Host: telemetry.audiolab.com
Content-Type: application/json
Authorization: Bearer user-token
{
"user_id": "a7f3c8e9d2b1...",
"request_timestamp": "2025-01-15T10:30:00Z"
}
Response:
HTTP/1.1 200 OK
Content-Type: application/json
{
"deletion_token": "xyz123...",
"estimated_completion": "2025-02-14T10:30:00Z",
"message": "Your data will be permanently deleted within 30 days"
}
🛡️ Security¶
API Key Management¶
// API key is embedded in binary (obfuscated)
// For production, use key rotation + certificate pinning
constexpr const char* getAPIKey() {
// XOR obfuscation (simple example)
static const uint8_t obfuscated_key[] = {
0x5A, 0x3C, 0x78, 0x12, /* ... */
};
static std::string key;
if (key.empty()) {
key.resize(sizeof(obfuscated_key));
for (size_t i = 0; i < sizeof(obfuscated_key); ++i) {
key[i] = obfuscated_key[i] ^ 0xAB; // XOR with secret
}
}
return key.c_str();
}
Rate Limiting¶
Server-side:
from flask_limiter import Limiter
limiter = Limiter(
app,
key_func=lambda: request.headers.get('X-User-ID'),
default_limits=["100 per hour", "1000 per day"]
)
@app.route('/v1/events', methods=['POST'])
@limiter.limit("10 per minute") # Prevent spam
def receive_events():
# Process telemetry batch
pass
SSL/TLS Verification¶
httpClient_.setSSLVerification(true);
httpClient_.setMinTLSVersion(TLS_1_2);
// Certificate pinning (optional, for extra security)
const std::string EXPECTED_CERT_FINGERPRINT = "sha256/abc123...";
httpClient_.setCertificatePinning(EXPECTED_CERT_FINGERPRINT);
📈 Monitoring¶
Client-side Metrics¶
struct TelemetryStats {
size_t events_queued = 0;
size_t events_sent = 0;
size_t events_failed = 0;
size_t bytes_sent = 0;
size_t batches_sent = 0;
std::chrono::system_clock::time_point last_send_time;
void recordEventQueued() { ++events_queued; }
void recordEventSent() { ++events_sent; }
void recordEventFailed() { ++events_failed; }
void recordBytesSent(size_t bytes) { bytes_sent += bytes; }
void recordBatchSent() {
++batches_sent;
last_send_time = std::chrono::system_clock::now();
}
nlohmann::json toJSON() const {
return {
{"events_queued", events_queued},
{"events_sent", events_sent},
{"events_failed", events_failed},
{"bytes_sent", bytes_sent},
{"batches_sent", batches_sent},
{"last_send_time", formatTimestamp(last_send_time)}
};
}
};