Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add repo sync analytics #1122

Merged
merged 4 commits into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions server/bleep/src/analytics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,36 @@ impl DocEvent {
}
}

#[derive(Debug, Clone)]
pub struct RepoEvent {
pub name: String,
pub payload: Vec<(String, Value)>,
}

impl RepoEvent {
pub fn new(name: &str) -> Self {
Self {
name: name.to_owned(),
payload: vec![],
}
}

pub fn with_payload<T: Serialize + Clone>(mut self, name: &str, payload: &T) -> Self {
self.payload.push((
name.to_owned(),
serde_json::to_value(payload.clone()).unwrap(),
));
self
}

pub fn add_payload<T: Serialize + Clone>(&mut self, name: &str, payload: &T) {
self.payload.push((
name.to_owned(),
serde_json::to_value(payload.clone()).unwrap(),
));
}
}

#[derive(Debug, Clone, Serialize)]
pub struct EventData {
kind: EventKind,
Expand Down Expand Up @@ -261,6 +291,17 @@ impl RudderHub {
..Default::default()
}));
}

pub fn track_repo(&self, event: RepoEvent, user: &crate::webserver::middleware::User) {
self.send(Message::Track(Track {
user_id: Some(self.tracking_id(user.username())),
event: "track_repo_index".into(),
properties: Some(serde_json::json!({
"payload": event.payload
})),
..Default::default()
}));
}
}

impl From<Option<String>> for DeviceId {
Expand Down
2 changes: 1 addition & 1 deletion server/bleep/src/background/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub struct SyncHandle {
pub(crate) filter_updates: FilterUpdate,
pub(crate) pipes: SyncPipes,
pub(crate) file_cache: FileCache,
app: Application,
pub(crate) app: Application,
exited: flume::Sender<SyncStatus>,
exit_signal: flume::Receiver<SyncStatus>,
}
Expand Down
39 changes: 29 additions & 10 deletions server/bleep/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,19 @@ pub struct FileCache {
embed_queue: EmbedQueue,
}

#[derive(Default)]
pub struct InsertStats {
pub new: usize,
pub updated: usize,
pub deleted: usize,
}

impl InsertStats {
fn empty() -> Self {
Self::default()
}
}

impl<'a> FileCache {
pub(crate) fn new(db: SqlDb, semantic: Semantic) -> Self {
Self {
Expand Down Expand Up @@ -396,7 +409,7 @@ impl<'a> FileCache {
buffer: &str,
lang_str: &str,
branches: &[String],
) {
) -> InsertStats {
let chunk_cache = self.chunks_for_file(repo_ref, cache_keys).await;
self.semantic
.chunks_for_buffer(
Expand All @@ -416,14 +429,16 @@ impl<'a> FileCache {
});

match chunk_cache.commit().await {
Ok((new, updated, deleted)) => {
Ok(stats) => {
info!(
repo_name,
relative_path, new, updated, deleted, "Successful commit"
)
relative_path, stats.new, stats.updated, stats.deleted, "Successful commit"
);
stats
}
Err(err) => {
warn!(repo_name, relative_path, ?err, "Failed to upsert vectors")
warn!(repo_name, relative_path, ?err, "Failed to upsert vectors");
InsertStats::empty()
}
}
}
Expand Down Expand Up @@ -578,16 +593,20 @@ impl<'a> ChunkCache<'a> {
/// Since qdrant changes are pipelined on their end, data written
/// here is not necessarily available for querying when the
/// commit's completed.
pub async fn commit(self) -> anyhow::Result<(usize, usize, usize)> {
pub async fn commit(self) -> anyhow::Result<InsertStats> {
let mut tx = self.sql.begin().await?;

let update_size = self.commit_branch_updates(&mut tx).await?;
let delete_size = self.commit_deletes(&mut tx).await?;
let new_size = self.commit_inserts(&mut tx).await?;
let updated = self.commit_branch_updates(&mut tx).await?;
let deleted = self.commit_deletes(&mut tx).await?;
let new = self.commit_inserts(&mut tx).await?;

tx.commit().await?;

Ok((new_size, update_size, delete_size))
Ok(InsertStats {
new,
updated,
deleted,
})
}

/// Insert new additions to sqlite
Expand Down
9 changes: 8 additions & 1 deletion server/bleep/src/indexes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use tantivy::{
DocAddress, Document, IndexReader, IndexWriter, Score,
};

mod analytics;
pub mod doc;
pub mod file;
pub mod reader;
Expand Down Expand Up @@ -79,11 +80,16 @@ pub struct Indexes {
pub repo: Indexer<Repo>,
pub file: Indexer<File>,
pub doc: Doc,
was_index_reset: bool,
write_mutex: tokio::sync::Mutex<()>,
}

impl Indexes {
pub async fn new(config: &Configuration, sql: crate::SqlDb) -> Result<Self> {
pub async fn new(
config: &Configuration,
sql: crate::SqlDb,
was_index_reset: bool,
) -> Result<Self> {
Ok(Self {
repo: Indexer::create(
Repo::new(),
Expand All @@ -104,6 +110,7 @@ impl Indexes {
config.max_threads,
)?,
write_mutex: Default::default(),
was_index_reset,
})
}

Expand Down
120 changes: 120 additions & 0 deletions server/bleep/src/indexes/analytics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
use crate::{analytics::RepoEvent, repo::RepoRef};
use tokio::{sync::mpsc, time::Instant};

#[derive(Default)]
pub struct WorkerStats {
// size in bytes
pub size: usize,
// number of qdrant chunkc
pub chunks: usize,
// number of dir-entries reindexed by this worker
pub reindex_count: usize,
}

impl std::ops::AddAssign for WorkerStats {
fn add_assign(&mut self, rhs: Self) {
self.size += rhs.size;
self.chunks += rhs.chunks;
self.reindex_count += rhs.reindex_count;
}
}

#[derive(serde::Serialize, Clone, Copy, PartialEq, Eq)]
enum IndexJobKind {
Index,
PeriodicSync { reindex_file_count: usize },
SchemaUpgrade { reindex_file_count: usize },
}

// the main entrypoint into gathering analytics for an index job
pub struct StatsGatherer {
// reciever of stats from worker threads
stats_rx: mpsc::UnboundedReceiver<WorkerStats>,
// pass this along to each worker thread
stats_tx: mpsc::UnboundedSender<WorkerStats>,
// the reporef of the target index job
reporef: RepoRef,
// the moment this job began
start_time: Instant,
// set to true if this is the first index of this reporef
pub is_first_index: bool,
// set to true if the index was reset on startup
pub was_index_reset: bool,
// gather analytics events into this `event` field
pub event: RepoEvent,
// combine stats from each worker thread into `repo_stats`
pub repo_stats: WorkerStats,
}

impl StatsGatherer {
pub fn for_repo(reporef: RepoRef) -> Self {
let (stats_tx, stats_rx) = mpsc::unbounded_channel();
Self {
stats_rx,
stats_tx,
event: RepoEvent::new("index"),
reporef,
is_first_index: false,
was_index_reset: false,
start_time: Instant::now(),
repo_stats: WorkerStats::default(),
}
}

pub fn sender(&self) -> mpsc::UnboundedSender<WorkerStats> {
self.stats_tx.clone()
}

#[rustfmt::skip]
pub async fn finish(mut self) -> RepoEvent {
// aggregate stats
self.stats_rx.close();
while let Some(stats) = self.stats_rx.recv().await {
self.repo_stats += stats;
}

// determine the type of index job run
//
let job_kind = if self.was_index_reset {
IndexJobKind::SchemaUpgrade {
reindex_file_count: self.repo_stats.reindex_count,
}
} else if self.is_first_index {
IndexJobKind::Index
} else {
IndexJobKind::PeriodicSync {
reindex_file_count: self.repo_stats.reindex_count,
}
};

self.event.add_payload("reporef", &self.reporef.name());
self.event.add_payload("provider", &self.reporef.backend());
self.event.add_payload("index_job_kind", &job_kind);
self.event.add_payload("chunk_count", &self.repo_stats.chunks);
self.event.add_payload("bytes", &human_readable(self.repo_stats.size));
self.event.add_payload("sync_time", &format!("{:?}", self.start_time.elapsed()));
self.event
}
}

fn human_readable(size: usize) -> String {
let suffixes = ["B", "KB", "MB", "GB"];
let s = suffixes
.iter()
.zip(0..10)
.rev()
.map(|(suf, exp)| (suf, size as f64 / (1024_f64.powi(exp))))
.find(|(_, t)| t >= &1.0);
s.map(|(suffix, value)| format!("{value:.2}{suffix}"))
.unwrap_or_else(|| size.to_string())
}

#[cfg(test)]
mod test {
#[test]
fn human_readable() {
assert_eq!(super::human_readable(15), "15.00B");
assert_eq!(super::human_readable(1024), "1.00KB");
assert_eq!(super::human_readable(7616597515), "7.09GB");
}
}
Loading