Skip to content

Commit

Permalink
add repo sync analytics (#1122)
Browse files Browse the repository at this point in the history
* add repo sync analytics

tracks repo metadata on first index, or on full reindex (when the
tantivy index hash is updated, for example), includes the following
info:

- repo ref
- repo provider
- size in bytes
- number of qdrant chunks
- time taken to index

sample event:

    RepoEvent {
        name: "index",
        payload: [
            ("repo_ref", String("nerdypepper/dijo")),
            ("provider", String("github")),
            ("file_count", Number(35)),
            ("chunk_count", Number(236)),
            ("bytes", String("116.59KB")),
            ("sync_time", String("4.957807429s")),
        ],
    }

---

to test this changeset, set the following values in `local_config.json`:

    {
        "analytics_key": "..",
        "analytics_data_plane": ".."
    }

run rudderstack in "live event stream" mode, and index a repository.

* track reindexs

- marks full index resets as `SchemaUpgrade`s
- marks periodic reindex jobs as `PeriodicSync`s

* clippy

* address review comments
  • Loading branch information
oppiliappan authored Nov 15, 2023
1 parent 6274268 commit c66e99a
Show file tree
Hide file tree
Showing 8 changed files with 258 additions and 19 deletions.
41 changes: 41 additions & 0 deletions server/bleep/src/analytics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,36 @@ impl DocEvent {
}
}

#[derive(Debug, Clone)]
pub struct RepoEvent {
pub name: String,
pub payload: Vec<(String, Value)>,
}

impl RepoEvent {
pub fn new(name: &str) -> Self {
Self {
name: name.to_owned(),
payload: vec![],
}
}

pub fn with_payload<T: Serialize + Clone>(mut self, name: &str, payload: &T) -> Self {
self.payload.push((
name.to_owned(),
serde_json::to_value(payload.clone()).unwrap(),
));
self
}

pub fn add_payload<T: Serialize + Clone>(&mut self, name: &str, payload: &T) {
self.payload.push((
name.to_owned(),
serde_json::to_value(payload.clone()).unwrap(),
));
}
}

#[derive(Debug, Clone, Serialize)]
pub struct EventData {
kind: EventKind,
Expand Down Expand Up @@ -261,6 +291,17 @@ impl RudderHub {
..Default::default()
}));
}

pub fn track_repo(&self, event: RepoEvent, user: &crate::webserver::middleware::User) {
self.send(Message::Track(Track {
user_id: Some(self.tracking_id(user.username())),
event: "track_repo_index".into(),
properties: Some(serde_json::json!({
"payload": event.payload
})),
..Default::default()
}));
}
}

impl From<Option<String>> for DeviceId {
Expand Down
2 changes: 1 addition & 1 deletion server/bleep/src/background/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub struct SyncHandle {
pub(crate) filter_updates: FilterUpdate,
pub(crate) pipes: SyncPipes,
pub(crate) file_cache: FileCache,
app: Application,
pub(crate) app: Application,
exited: flume::Sender<SyncStatus>,
exit_signal: flume::Receiver<SyncStatus>,
}
Expand Down
39 changes: 29 additions & 10 deletions server/bleep/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,19 @@ pub struct FileCache {
embed_queue: EmbedQueue,
}

#[derive(Default)]
pub struct InsertStats {
pub new: usize,
pub updated: usize,
pub deleted: usize,
}

impl InsertStats {
fn empty() -> Self {
Self::default()
}
}

impl<'a> FileCache {
pub(crate) fn new(db: SqlDb, semantic: Semantic) -> Self {
Self {
Expand Down Expand Up @@ -396,7 +409,7 @@ impl<'a> FileCache {
buffer: &str,
lang_str: &str,
branches: &[String],
) {
) -> InsertStats {
let chunk_cache = self.chunks_for_file(repo_ref, cache_keys).await;
self.semantic
.chunks_for_buffer(
Expand All @@ -416,14 +429,16 @@ impl<'a> FileCache {
});

match chunk_cache.commit().await {
Ok((new, updated, deleted)) => {
Ok(stats) => {
info!(
repo_name,
relative_path, new, updated, deleted, "Successful commit"
)
relative_path, stats.new, stats.updated, stats.deleted, "Successful commit"
);
stats
}
Err(err) => {
warn!(repo_name, relative_path, ?err, "Failed to upsert vectors")
warn!(repo_name, relative_path, ?err, "Failed to upsert vectors");
InsertStats::empty()
}
}
}
Expand Down Expand Up @@ -578,16 +593,20 @@ impl<'a> ChunkCache<'a> {
/// Since qdrant changes are pipelined on their end, data written
/// here is not necessarily available for querying when the
/// commit's completed.
pub async fn commit(self) -> anyhow::Result<(usize, usize, usize)> {
pub async fn commit(self) -> anyhow::Result<InsertStats> {
let mut tx = self.sql.begin().await?;

let update_size = self.commit_branch_updates(&mut tx).await?;
let delete_size = self.commit_deletes(&mut tx).await?;
let new_size = self.commit_inserts(&mut tx).await?;
let updated = self.commit_branch_updates(&mut tx).await?;
let deleted = self.commit_deletes(&mut tx).await?;
let new = self.commit_inserts(&mut tx).await?;

tx.commit().await?;

Ok((new_size, update_size, delete_size))
Ok(InsertStats {
new,
updated,
deleted,
})
}

/// Insert new additions to sqlite
Expand Down
9 changes: 8 additions & 1 deletion server/bleep/src/indexes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use tantivy::{
DocAddress, Document, IndexReader, IndexWriter, Score,
};

mod analytics;
pub mod doc;
pub mod file;
pub mod reader;
Expand Down Expand Up @@ -79,11 +80,16 @@ pub struct Indexes {
pub repo: Indexer<Repo>,
pub file: Indexer<File>,
pub doc: Doc,
was_index_reset: bool,
write_mutex: tokio::sync::Mutex<()>,
}

impl Indexes {
pub async fn new(config: &Configuration, sql: crate::SqlDb) -> Result<Self> {
pub async fn new(
config: &Configuration,
sql: crate::SqlDb,
was_index_reset: bool,
) -> Result<Self> {
Ok(Self {
repo: Indexer::create(
Repo::new(),
Expand All @@ -104,6 +110,7 @@ impl Indexes {
config.max_threads,
)?,
write_mutex: Default::default(),
was_index_reset,
})
}

Expand Down
120 changes: 120 additions & 0 deletions server/bleep/src/indexes/analytics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
use crate::{analytics::RepoEvent, repo::RepoRef};
use tokio::{sync::mpsc, time::Instant};

#[derive(Default)]
pub struct WorkerStats {
// size in bytes
pub size: usize,
// number of qdrant chunkc
pub chunks: usize,
// number of dir-entries reindexed by this worker
pub reindex_count: usize,
}

impl std::ops::AddAssign for WorkerStats {
fn add_assign(&mut self, rhs: Self) {
self.size += rhs.size;
self.chunks += rhs.chunks;
self.reindex_count += rhs.reindex_count;
}
}

#[derive(serde::Serialize, Clone, Copy, PartialEq, Eq)]
enum IndexJobKind {
Index,
PeriodicSync { reindex_file_count: usize },
SchemaUpgrade { reindex_file_count: usize },
}

// the main entrypoint into gathering analytics for an index job
pub struct StatsGatherer {
// reciever of stats from worker threads
stats_rx: mpsc::UnboundedReceiver<WorkerStats>,
// pass this along to each worker thread
stats_tx: mpsc::UnboundedSender<WorkerStats>,
// the reporef of the target index job
reporef: RepoRef,
// the moment this job began
start_time: Instant,
// set to true if this is the first index of this reporef
pub is_first_index: bool,
// set to true if the index was reset on startup
pub was_index_reset: bool,
// gather analytics events into this `event` field
pub event: RepoEvent,
// combine stats from each worker thread into `repo_stats`
pub repo_stats: WorkerStats,
}

impl StatsGatherer {
pub fn for_repo(reporef: RepoRef) -> Self {
let (stats_tx, stats_rx) = mpsc::unbounded_channel();
Self {
stats_rx,
stats_tx,
event: RepoEvent::new("index"),
reporef,
is_first_index: false,
was_index_reset: false,
start_time: Instant::now(),
repo_stats: WorkerStats::default(),
}
}

pub fn sender(&self) -> mpsc::UnboundedSender<WorkerStats> {
self.stats_tx.clone()
}

#[rustfmt::skip]
pub async fn finish(mut self) -> RepoEvent {
// aggregate stats
self.stats_rx.close();
while let Some(stats) = self.stats_rx.recv().await {
self.repo_stats += stats;
}

// determine the type of index job run
//
let job_kind = if self.was_index_reset {
IndexJobKind::SchemaUpgrade {
reindex_file_count: self.repo_stats.reindex_count,
}
} else if self.is_first_index {
IndexJobKind::Index
} else {
IndexJobKind::PeriodicSync {
reindex_file_count: self.repo_stats.reindex_count,
}
};

self.event.add_payload("reporef", &self.reporef.name());
self.event.add_payload("provider", &self.reporef.backend());
self.event.add_payload("index_job_kind", &job_kind);
self.event.add_payload("chunk_count", &self.repo_stats.chunks);
self.event.add_payload("bytes", &human_readable(self.repo_stats.size));
self.event.add_payload("sync_time", &format!("{:?}", self.start_time.elapsed()));
self.event
}
}

fn human_readable(size: usize) -> String {
let suffixes = ["B", "KB", "MB", "GB"];
let s = suffixes
.iter()
.zip(0..10)
.rev()
.map(|(suf, exp)| (suf, size as f64 / (1024_f64.powi(exp))))
.find(|(_, t)| t >= &1.0);
s.map(|(suffix, value)| format!("{value:.2}{suffix}"))
.unwrap_or_else(|| size.to_string())
}

#[cfg(test)]
mod test {
#[test]
fn human_readable() {
assert_eq!(super::human_readable(15), "15.00B");
assert_eq!(super::human_readable(1024), "1.00KB");
assert_eq!(super::human_readable(7616597515), "7.09GB");
}
}
Loading

0 comments on commit c66e99a

Please sign in to comment.