diff --git a/lib/workload/stateless/stacks/filemanager/README.md b/lib/workload/stateless/stacks/filemanager/README.md index 31c525e85..4300674b5 100644 --- a/lib/workload/stateless/stacks/filemanager/README.md +++ b/lib/workload/stateless/stacks/filemanager/README.md @@ -86,7 +86,7 @@ Alternatively, just `brew install dbeaver-community` to easily browse the databa ## Local API server -To use the local API server, run: +For more details on the filemanager API, see the [`API_GUIDE.md`][api-guide]. To use the local API server, run: ```sh make api @@ -125,7 +125,7 @@ docker system prune -a --volumes ## Architecture The filemanager ingest functionality operates to ensure eventual consistency in the database records. See the -[ARCHITECTURE.md][architecture] for more details. +[`ARCHITECTURE.md`][architecture] for more details. ## Project Layout @@ -142,6 +142,7 @@ The project is divided into multiple crates that serve different functionality. * [database]: Database migration files and queries. [architecture]: docs/ARCHITECTURE.md +[api-guide]: docs/API_GUIDE.md [filemanager]: filemanager [filemanager-api-lambda]: filemanager-api-lambda [filemanager-api-server]: filemanager-api-server diff --git a/lib/workload/stateless/stacks/filemanager/docs/API_GUIDE.md b/lib/workload/stateless/stacks/filemanager/docs/API_GUIDE.md new file mode 100644 index 000000000..41c92d047 --- /dev/null +++ b/lib/workload/stateless/stacks/filemanager/docs/API_GUIDE.md @@ -0,0 +1,169 @@ +# Filemanager API + +The filemanager API gives access to S3 object records for all [S3 file events][s3-events] which are recorded in the database. + +To start a local API server and view the OpenAPI documentation, run the following: + +```sh +make api +``` + +This serves Swagger OpenAPI docs at `http://localhost:8000/swagger_ui` when using default settings. + +The deployed instance of the filemanager API can be reached using the desired stage at `https://file..umccr.org` +using the orcabus API token. To retrieve the token, run: + +```sh +export TOKEN=$(aws secretsmanager get-secret-value --secret-id orcabus/token-service-jwt --output json --query SecretString | jq -r 'fromjson | .id_token') +``` + +## Querying records + +The API is designed to have a standard set of REST routes which can be used to query for records. The API is version with a +`/api/v1` route prefix, and S3 object records can be reached under `/api/v1/s3_objects`. + +For example, to query a single record, use the `s3_object_id` in the path, which returns the JSON record: + +```sh +curl -H "Authorization: Bearer $TOKEN" "https://file.dev.umccr.org/api/v1/s3_objects/0190465f-68fa-76e4-9c36-12bdf1a1571d" | jq +``` + +Multiple records can be reached using the same route, which returns an array of JSON records: + +```sh +curl -H "Authorization: Bearer $TOKEN" "https://file.dev.umccr.org/api/v1/s3_objects" | jq +``` + +This route is paginated, and by default returns 1000 records from the first page in a JSON list response: + +```json +{ + "next_page": 1, + "results": [ + "..." + ] +} +``` + +Use the `page` and `page_size` query parameters to control the pagination: + +```sh +curl -H "Authorization: Bearer $TOKEN" "https://file.dev.umccr.org/api/v1/s3_objects?page=10&page_size=50" | jq +``` + +The records can be filtered using the same fields from the record by naming the field in a query parameter. +For example, query all records for a certain bucket: + +```sh +curl -H "Authorization: Bearer $TOKEN" "https://file.dev.umccr.org/api/v1/s3_objects?bucket=umccr-temp-dev" | jq +``` + +Since the filemanager database keeps a copy of all S3 events that it receives, old records for deleted objects +are also kept in the database. In order to retrieve only current objects, that is, objects that are still in S3 and +don't have an associated `Deleted` event, use the `current_state` query parameter: + +```sh +curl -H "Authorization: Bearer $TOKEN" "https://file.dev.umccr.org/api/v1/s3_objects?current_state=true" | jq +``` + +### Attributes + +The filemanager has the ability to save JSON attributes on any records. Attributes can be used to query similar to +filtering on record fields. The syntax for attribute querying uses square brackets to access nested JSON fields, similar +to the syntax defined by the [qs] npm package. Brackets should be percent-encoded in URLs. + +For example, query for a previously set `portal_run_id`: + +```sh +curl --get -H "Authorization: Bearer $TOKEN" --data-urlencode "attributes[portal_run_id]=202405212aecb782" \ +"https://file.dev.umccr.org/api/v1/s3_objects" | jq +``` + +> [!NOTE] +> Attributes on filemanager records start empty. They need to be added to the record to query on them later. +> See [updating records](#updating-records) + +### Wilcard matching + +The API supports using wildcards to match multiple characters in a value for most field. Use `%` to match multiple characters +and `_` to match one character. These queries get converted to postgres `like` queries under the hood. For example, query +on a key prefix: + +```sh +curl --get -H "Authorization: Bearer $TOKEN" --data-urlencode "key=temp\_data%" \ +"https://file.dev.umccr.org/api/v1/s3_objects" | jq +``` + +Case-insensitive wildcard matching, which gets converted to a postgres `ilike` statement, is supported by using `case_sensitive`: + +```sh +curl --get -H "Authorization: Bearer $TOKEN" --data-urlencode "key=temp\_data%" \ +"https://file.dev.umccr.org/api/v1/s3_objects?case_sensitive=false" | jq +``` + +Wildcard matching is also supported on attributes: + +```sh +curl --get -H "Authorization: Bearer $TOKEN" --data-urlencode "attributes[portal_run_id]=20240521%" \ +"https://file.dev.umccr.org/api/v1/s3_objects" | jq +``` + +## Updating records + +As part of allowing filemanager to link and query on attributes, attributes can be updated using PATCH requests. +Each of the above endpoints and queries supports a PATCH request which can be used to update attributes on a set +of records, instead of listing records. All query parameters except pagination are supported for updates. +Attributes are update using [JSON patch][json-patch]. + +For example, update attributes on a single record: + +```sh +curl -X PATCH -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" \ +--data '{ "attributes": [ { "op": "add", "path": "/portal_run_id", "value": "202405212aecb782" } ] }' \ +"https://file.dev.umccr.org/api/v1/s3_objects/0190465f-68fa-76e4-9c36-12bdf1a1571d" | jq +``` + +Or, update attributes for multiple records with the same key prefix: + +```sh +curl -X PATCH -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" \ +--data '{ "attributes": [ { "op": "add", "path": "/portal_run_id", "value": "202405212aecb782" } ] }' \ +"https://file.dev.umccr.org/api/v1/s3_objects?key=%25202405212aecb782%25" | jq +``` + +## Count objects + +There is an API route which counts the total number of records in the database, which supports +similar query parameters as the regular list operations. + +For example, count the total records: + +```sh +curl -H "Authorization: Bearer $TOKEN" "https://file.dev.umccr.org/api/v1/s3_objects/count" | jq +``` + +## The `objects` record + +There is a similar record kept in the filemanager database called `object`. A similar REST API +is available for these records under `/api/v1/objects`, however the `object` currently don't server +a purpose in filemanager. They were initially included to support attribute linking, however they will likely +be removed because attribute linking can be accomplished using the `attributes` column on `s3_object`. + +## Some missing features + +There are some missing features in the query API which are planned, namely: + +* There is no way to compare values with `>`, `>=`, `<`, `<=`. +* There is no way to express `and` or `or` conditions in the API. + +There are also some feature missing for attribute linking. For example, there is no way +to capture matching wildcard groups which can later be used in the JSON patch body. + +There is also no way to POST an attribute linking rule, which can be used to update S3 records +as they are received by filemanager. See [ATTRIBUTE_LINKING.md][attribute-linking] for a discussion on some approaches +for this. The likely solution will involve merging the above wildcard matching logic with attribute rules. + +[json-patch]: https://jsonpatch.com/ +[qs]: https://github.com/ljharb/qs +[s3-events]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/EventNotifications.html +[attribute-linking]: ATTRIBUTE_LINKING.md \ No newline at end of file diff --git a/lib/workload/stateless/stacks/filemanager/docs/ATTRIBUTE_LINKING.md b/lib/workload/stateless/stacks/filemanager/docs/ATTRIBUTE_LINKING.md new file mode 100644 index 000000000..fa3632158 --- /dev/null +++ b/lib/workload/stateless/stacks/filemanager/docs/ATTRIBUTE_LINKING.md @@ -0,0 +1,145 @@ +> [!IMPORTANT] +> This document is a discussion on potential designs for filemanager attribute linking. The current implementation +> isn't exactly like this, although it will probably contain components of the design here. + +# FileManager Attribute Linking + +The filemanager needs to be able to store data from other microservices on `s3_object` records in order to perform some +logic, e.g. querying on the stored data. Ideally the filemanager should only deal with object records, without having +to know the domain of other microservices. This means that the filemanager needs a mechanism to store arbitrary +data, and be told what data to store for a given record. + +## Storing attributes + +The first part of the problem, i.e. storing data on records is solved by using the `attributes` column on `s3_object`. +The `attributes` can store arbitrary JSON data, which can be queried for using postgres JSON extensions. +E.g. To fetch all objects where the attributes contain an 'attribute_id' looks like: + +```sql +select * from s3_object where attributes @> '{ "attribute_id": "some_id" }'; +``` + +From the API a nested style syntax can be used: + +``` +/api/v1/s3_objects?attributes[attribute_id]=some_id +``` + +Attributes can be used by other services to perform logic, e.g. the UI could fetch all objects where +the `portal_run_id` equals ``. + +## Knowing what attributes to store + +The filemanager needs to be told by other services what attributes to store. However, it only receives native S3 events +as input which cannot be edited. + +A generalisable way to accomplish this is to allow the filemanager to accept a set of 'rules', which given +an S3 event as input, determine the attribute. For example, a rule could be: + +* For all events where the bucket equals 'umccr-temp-dev', and the key starts with a prefix 'analysis_data/.../.../.../...' +extract the 4th path segment and add the attribute: `{ "portal_run_id": "<4th path segment>" }`. + +### Rules engine + +The microservice which knows about the rule could tell the filemanager about it. The rules could be published on the +event bus and use a JSON rules engine, similar to the way EventBridge rules are parsed. + +For example, the workflow manager could tell filemanager a rule about matching buckets/keys using the following event message: + +```json +{ + "detail-type": [ + "FileManagerAttributeRule" + ], + "source": [ + "orcabus.workflowmanager" + ], + "detail": { + "rule": { + "bucket": "umccr-temp-dev", + "key": "some_prefix/(*)/*" + }, + "apply": { + "some_attribute_id": "" + }, + "start_from": "", + "expires": "" + } +} +``` + +This rule would match all S3 events that have 'umccr-temp-dev' as the bucket, and keys with 'some_prefix' containing a +regex capture group. The rule only applies to events received between `starts_from` and `expires`. + +Filemanager would store this rule, and check existing rules for each S3 event to see if it needs to add +attributes. If the rule is received by filemanager after an event has already fired, that's okay, the filemanager can +apply the rule retroactively to its database records. + +The advantage of this approach is that it is quite general, and it means that the filemanager doesn't need to know any +details about other microservices' logic/domains. Rules also don't need to be emitted by services to be used. For example, +statically derived attributes that only need information from the S3 event could be initialized into the filemanager +database as it's deployed. + +Rules could be updated if required. There could also be different operators that merge attributes in different ways, +e.g. 'append attribute', 'append if not exists', 'update attribute', 'overwrite attribute', etc. + +A potential disadvantage is that the rules engine may not be flexible enough to accommodate all attribute requirements. +E.g. it's not possible to execute arbitrary code to compute the attribute. + +### Technical challenges + +It doesn't seem like there are many implementations of JSON rules engines. In Rust there is [json-rules-engine-rs], +which seems to be based on the javascript [json-rules-engine]. Notably, there is +[Event Ruler][aws-event-ruler] which is a Java library and what AWS EventBridge rules uses. Calling a Java library +from Rust would require some FFI bindings. + +An existing library doesn't have to be used. Since the format of S3 events is known in advance, +a simpler approach would probably involve implementing the rules manually in filemanager, leveraging something like [serde_json]. + +[json-rules-engine-rs]: https://github.com/GopherJ/json-rules-engine-rs +[json-rules-engine]: https://github.com/CacheControl/json-rules-engine +[aws-event-ruler]: https://github.com/aws/event-ruler +[serde_json]: https://github.com/serde-rs/json + +### Architecture + +The architecture of this approach could look something like this, where each service emits rules for the filemanager to +consume: + +![filemanager_attribute_linking](./filemanager_attributes.drawio.svg) + +Here the filemanager stores rules in its database and processes them directly. + +Alternatively, the linking logic could be a separate microservice (FileManagerAttributeManager? ThePreFileManagerManager?): + +![filemanager_attribute_linking_service](./filemanager_attributes_alt.drawio.svg) + +Here the filemanager ingests events that contain additional attributes from another SQS queue, and the +attribute manager consumes events from the S3 queue. In order to update existing records, the filemanager could +accept a POST request to update a set of records that the attribute manager knows about. + +An advantage of this approach is that it can use different languages, which would be useful if using rules libraries like +Event Ruler. + +The disadvantage is that it adds more complexity, and more latency in the S3 event processing, because now +the filemanager is no longer directly consuming S3 events. + +## Alternatives + +Instead of microservices pushing rules into the event bus, the filemanager could query the microservices to decide what +to do with the events. However, this adds many API calls if the filemanager has to query on a per-event basis. + +Instead of reading/parsing rules in JSON, there could be a filemanager extension/plugin system which runs on each S3 +event to determine attributes. This could be separate from the filemanager code, and would work well for +statically derived attributes. However, it may also introduce many API calls if the filemanager has to query other microservices +on a per-event basis. + +A combination of these approaches is also possible, where there is some rule matching, and an extension which can +query other microservices/perform complex logic on the matched events only. + +## Questions + +1. Is a rule-based regex-style approach enough to cover all use-cases for generating attributes, or does more complicated + logic need to happen? +2. Are expiry/start dates for rules flexible enough to deal with changes in the rules over time? + diff --git a/lib/workload/stateless/stacks/filemanager/filemanager-api-server/README.md b/lib/workload/stateless/stacks/filemanager/filemanager-api-server/README.md index a2d3a85b7..e8abfe8b7 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager-api-server/README.md +++ b/lib/workload/stateless/stacks/filemanager/filemanager-api-server/README.md @@ -1,10 +1,10 @@ # filemanager-api-server An instance of the filemanager api which can be launched as a webserver. The default address which the webserver uses -is `localhost:8080`. Set the `FILEMANAGER_API_SERVER_ADDR` environment variable to change this. To run the local server: +is `0.0.0.0:8000`. Set the `FILEMANAGER_API_SERVER_ADDR` environment variable to change this. To run the local server: ```sh make api ``` -Then, checkout the OpenAPI docs at: `http://localhost:8080/swagger_ui`. \ No newline at end of file +Then, checkout the OpenAPI docs at: `http://localhost:8000/swagger_ui`. \ No newline at end of file diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/error.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/error.rs index 0df3c1f1c..d594c16cd 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/error.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/error.rs @@ -37,7 +37,7 @@ pub enum Error { QueryError(String), #[error("invalid input: `{0}`")] InvalidQuery(String), - #[error("expected some value for id: `{0}`")] + #[error("expected record for id: `{0}`")] ExpectedSomeValue(Uuid), } diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/list.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/list.rs index d3e39e135..7de989f8b 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/list.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/list.rs @@ -3,18 +3,22 @@ use sea_orm::prelude::Expr; use sea_orm::sea_query::extension::postgres::PgExpr; -use sea_orm::sea_query::{Alias, Asterisk, PostgresQueryBuilder, Query}; +use sea_orm::sea_query::{ + Alias, Asterisk, ColumnRef, IntoColumnRef, PostgresQueryBuilder, Query, SimpleExpr, +}; use sea_orm::Order::{Asc, Desc}; use sea_orm::{ - ColumnTrait, Condition, ConnectionTrait, EntityTrait, FromQueryResult, PaginatorTrait, - QueryFilter, QueryOrder, QuerySelect, QueryTrait, Select, + ActiveEnum, ColumnTrait, Condition, ConnectionTrait, EntityTrait, FromQueryResult, + IntoSimpleExpr, JsonValue, PaginatorTrait, QueryFilter, QueryOrder, QuerySelect, QueryTrait, + Select, Value, }; use tracing::trace; use crate::database::entities::{object, s3_object}; use crate::error::Error::OverflowError; use crate::error::{Error, Result}; -use crate::routes::filtering::{ObjectsFilterAll, S3ObjectsFilterAll}; +use crate::routes::filter::wildcard::WildcardEither; +use crate::routes::filter::{ObjectsFilter, S3ObjectsFilter}; use crate::routes::list::{ListCount, ListResponse}; use crate::routes::pagination::Pagination; @@ -46,17 +50,28 @@ where } /// Create a condition to filter a query. - pub fn filter_condition(filter: ObjectsFilterAll) -> Condition { - Condition::all().add_option( - filter - .attributes - .map(|v| Expr::col(object::Column::Attributes).contains(v)), - ) + pub fn filter_condition(filter: ObjectsFilter, case_sensitive: bool) -> Condition { + let mut condition = Condition::all(); + + if let Some(attributes) = filter.attributes { + let json_conditions = Self::json_conditions( + object::Column::Attributes.into_column_ref(), + attributes, + case_sensitive, + ); + for json_condition in json_conditions { + condition = condition.add(json_condition); + } + } + + condition } /// Filter records by all fields in the filter variable. - pub fn filter_all(mut self, filter: ObjectsFilterAll) -> Self { - self.select = self.select.filter(Self::filter_condition(filter)); + pub fn filter_all(mut self, filter: ObjectsFilter, case_sensitive: bool) -> Self { + self.select = self + .select + .filter(Self::filter_condition(filter, case_sensitive)); self } } @@ -88,8 +103,10 @@ where /// bucket = filter.bucket and /// ...; /// ``` - pub fn filter_all(mut self, filter: S3ObjectsFilterAll) -> Self { - self.select = self.select.filter(Self::filter_condition(filter)); + pub fn filter_all(mut self, filter: S3ObjectsFilter, case_sensitive: bool) -> Self { + self.select = self + .select + .filter(Self::filter_condition(filter, case_sensitive)); self.trace_query("filter_all"); @@ -97,44 +114,74 @@ where } /// Create a condition to filter a query. - pub fn filter_condition(filter: S3ObjectsFilterAll) -> Condition { - Condition::all() - .add_option( - filter - .event_type - .map(|v| s3_object::Column::EventType.eq(v)), - ) - .add_option(filter.bucket.map(|v| s3_object::Column::Bucket.eq(v))) - .add_option(filter.key.map(|v| s3_object::Column::Key.eq(v))) - .add_option( - filter - .version_id - .map(|v| s3_object::Column::VersionId.eq(v)), - ) - .add_option(filter.date.map(|v| s3_object::Column::Date.eq(v))) + pub fn filter_condition(filter: S3ObjectsFilter, case_sensitive: bool) -> Condition { + let mut condition = Condition::all() + .add_option(filter.event_type.map(|v| { + Self::filter_operation( + Expr::col(s3_object::Column::EventType), + v.map(|or| or.as_enum()), + case_sensitive, + ) + })) + .add_option(filter.bucket.map(|v| { + Self::filter_operation( + Expr::col(s3_object::Column::Bucket), + WildcardEither::Wildcard::(v), + case_sensitive, + ) + })) + .add_option(filter.key.map(|v| { + Self::filter_operation( + Expr::col(s3_object::Column::Key), + WildcardEither::Wildcard::(v), + case_sensitive, + ) + })) + .add_option(filter.version_id.map(|v| { + Self::filter_operation( + Expr::col(s3_object::Column::VersionId), + WildcardEither::Wildcard::(v), + case_sensitive, + ) + })) + .add_option(filter.date.map(|v| { + Self::filter_operation(Expr::col(s3_object::Column::Date), v, case_sensitive) + })) .add_option(filter.size.map(|v| s3_object::Column::Size.eq(v))) .add_option(filter.sha256.map(|v| s3_object::Column::Sha256.eq(v))) - .add_option( - filter - .last_modified_date - .map(|v| s3_object::Column::LastModifiedDate.eq(v)), - ) + .add_option(filter.last_modified_date.map(|v| { + Self::filter_operation( + Expr::col(s3_object::Column::LastModifiedDate), + v, + case_sensitive, + ) + })) .add_option(filter.e_tag.map(|v| s3_object::Column::ETag.eq(v))) - .add_option( - filter - .storage_class - .map(|v| s3_object::Column::StorageClass.eq(v)), - ) + .add_option(filter.storage_class.map(|v| { + Self::filter_operation( + Expr::col(s3_object::Column::StorageClass), + v.map(|or| or.as_enum()), + case_sensitive, + ) + })) .add_option( filter .is_delete_marker .map(|v| s3_object::Column::IsDeleteMarker.eq(v)), - ) - .add_option( - filter - .attributes - .map(|v| Expr::col(s3_object::Column::Attributes).contains(v)), - ) + ); + + if let Some(attributes) = filter.attributes { + let json_conditions = Self::json_conditions( + s3_object::Column::Attributes.into_column_ref(), + attributes, + case_sensitive, + ); + for json_condition in json_conditions { + condition = condition.add(json_condition); + } + } + + condition } /// Update this query to find objects that represent the current state of S3 objects. That is, @@ -285,6 +332,124 @@ where Ok(ListCount::new(self.count().await?)) } + /// Create an operation based on the wildcard. This either results in a `like`/`ilike` or `=` comparison. + pub fn filter_operation( + expr: S, + wildcard: WildcardEither, + case_sensitive: bool, + ) -> SimpleExpr + where + S: Into, + W: Into, + { + let text_cast = Alias::new("text"); + let expr = expr.into(); + match wildcard { + WildcardEither::Or(value) => expr.eq(value), + WildcardEither::Wildcard(wildcard) => { + if wildcard.contains_wildcard() && case_sensitive { + expr.cast_as(text_cast).like(wildcard.into_inner()) + } else if wildcard.contains_wildcard() && !case_sensitive { + expr.cast_as(text_cast).ilike(wildcard.into_inner()) + } else { + expr.eq(wildcard.into_inner()) + } + } + } + } + + /// Add a json condition using equality and a concrete type. + fn add_eq_condition( + acc: &mut Vec, + expr: SimpleExpr, + value: V, + case_sensitive: bool, + ) where + V: Into, + { + acc.push(Self::filter_operation( + expr, + WildcardEither::::or(value), + case_sensitive, + )) + } + + /// Add a json condition using like and a wildcard. + fn add_like_condition( + acc: &mut Vec, + expr: SimpleExpr, + value: String, + case_sensitive: bool, + ) { + acc.push(Self::filter_operation( + expr, + WildcardEither::::wildcard(value), + case_sensitive, + )) + } + + /// A recursive function to convert a json value to postgres ->> statements. This traverses the + /// JSON tree and appends a list of conditions to `acc`. In practice, this should never + /// produce more than one condition if using serde_qs because serde_qs should only parse one nested + /// json object. However, it is implemented fully here in case it is useful for JSON-based rules. + fn apply_json_condition( + acc: &mut Vec, + expr: SimpleExpr, + json: JsonValue, + case_sensitive: bool, + ) { + let mut traverse_expr = |cast_expr, v| { + let expr = expr + .clone() + .cast_as(Alias::new("jsonb")) + .cast_json_field(cast_expr); + Self::apply_json_condition(acc, expr, v, case_sensitive) + }; + + match json { + // Primitive types are compared for equality. + v @ JsonValue::Null => Self::add_eq_condition(acc, expr, v, case_sensitive), + JsonValue::Bool(v) => Self::add_eq_condition(acc, expr, v, case_sensitive), + JsonValue::Number(v) => { + if let Some(n) = v.as_f64() { + Self::add_eq_condition(acc, expr, n, case_sensitive) + } else if let Some(n) = v.as_i64() { + Self::add_eq_condition(acc, expr, n, case_sensitive) + } else if let Some(n) = v.as_u64() { + Self::add_eq_condition(acc, expr, n, case_sensitive) + } + } + // Strings are compared as wildcards. + JsonValue::String(v) => Self::add_like_condition(acc, expr, v, case_sensitive), + // Arrays traverse with an index. + JsonValue::Array(array) => { + for (i, v) in array.into_iter().enumerate() { + traverse_expr(Expr::val(i as u32), v) + } + } + // Objects traverse with a key. + JsonValue::Object(o) => { + for (k, v) in o.into_iter() { + traverse_expr(Expr::val(k), v) + } + } + } + } + + /// Create a series of json conditions by traversing the JSON tree. + pub fn json_conditions( + col: ColumnRef, + json: JsonValue, + case_sensitive: bool, + ) -> Vec { + let mut conditions = vec![]; + let expr = Expr::col(col).into_simple_expr(); + + Self::apply_json_condition(&mut conditions, expr, json, case_sensitive); + + conditions + } + /// Trace the current query. pub fn trace_query(&self, message: &str) { trace!( @@ -295,15 +460,22 @@ where } #[cfg(test)] -mod tests { +pub(crate) mod tests { + use sea_orm::prelude::Json; + use sea_orm::sea_query::extension::postgres::PgBinOper; + use sea_orm::sea_query::types::BinOper; + use sea_orm::sea_query::IntoColumnRef; + use sea_orm::DatabaseConnection; use serde_json::json; use sqlx::PgPool; use super::*; use crate::database::aws::migration::tests::MIGRATOR; - use crate::database::entities::sea_orm_active_enums::EventType; + use crate::database::entities::sea_orm_active_enums::{EventType, StorageClass}; use crate::database::Client; + use crate::queries::update::tests::{change_many, entries_many, null_attributes}; use crate::queries::EntriesBuilder; + use crate::routes::filter::wildcard::Wildcard; #[sqlx::test(migrator = "MIGRATOR")] async fn test_list_objects(pool: PgPool) { @@ -401,19 +573,25 @@ mod tests { let builder = ListQueryBuilder::<_, s3_object::Entity>::new(client.connection_ref()) .current_state() - .filter_all(S3ObjectsFilterAll { - size: Some(14), - ..Default::default() - }); + .filter_all( + S3ObjectsFilter { + size: Some(14), + ..Default::default() + }, + true, + ); let result = builder.all().await.unwrap(); assert_eq!(result, vec![entries[6].clone()]); // Order of filter call shouldn't matter. let builder = ListQueryBuilder::<_, s3_object::Entity>::new(client.connection_ref()) - .filter_all(S3ObjectsFilterAll { - size: Some(4), - ..Default::default() - }) + .filter_all( + S3ObjectsFilter { + size: Some(4), + ..Default::default() + }, + true, + ) .current_state(); let result = builder.all().await.unwrap(); assert_eq!(result, vec![entries[24].clone()]); @@ -426,35 +604,38 @@ mod tests { let result = filter_all_objects_from( &client, - ObjectsFilterAll { + ObjectsFilter { attributes: Some(json!({ "attribute_id": "1" })), }, + true, ) .await; assert_eq!(result, vec![entries[1].clone()]); let result = filter_all_objects_from( &client, - ObjectsFilterAll { + ObjectsFilter { attributes: Some(json!({ "nested_id": { "attribute_id": "1" } })), }, + true, ) .await; assert_eq!(result, vec![entries[1].clone()]); let result = filter_all_objects_from( &client, - ObjectsFilterAll { + ObjectsFilter { attributes: Some(json!({ "non_existent_id": "1" })), }, + true, ) .await; assert!(result.is_empty()); @@ -507,20 +688,15 @@ mod tests { let result = filter_all_s3_objects_from( &client, - S3ObjectsFilterAll { - event_type: Some(EventType::Created), + S3ObjectsFilter { + event_type: Some(WildcardEither::Or(EventType::Created)), ..Default::default() }, + true, ) .await; assert_eq!(result.len(), 5); - assert_eq!( - result, - entries - .into_iter() - .filter(|entry| entry.event_type == EventType::Created) - .collect::>() - ); + assert_eq!(result, filter_event_type(entries, EventType::Created)); } #[sqlx::test(migrator = "MIGRATOR")] @@ -534,11 +710,12 @@ mod tests { let result = filter_all_s3_objects_from( &client, - S3ObjectsFilterAll { - bucket: Some("0".to_string()), - key: Some("1".to_string()), + S3ObjectsFilter { + bucket: Some(Wildcard::new("0".to_string())), + key: Some(Wildcard::new("1".to_string())), ..Default::default() }, + true, ) .await; assert_eq!(result, vec![entries[1].clone()]); @@ -555,19 +732,20 @@ mod tests { let result = filter_all_s3_objects_from( &client, - S3ObjectsFilterAll { + S3ObjectsFilter { attributes: Some(json!({ "attribute_id": "1" })), ..Default::default() }, + true, ) .await; assert_eq!(result, vec![entries[1].clone()]); let result = filter_all_s3_objects_from( &client, - S3ObjectsFilterAll { + S3ObjectsFilter { attributes: Some(json!({ "nested_id": { "attribute_id": "2" @@ -575,44 +753,48 @@ mod tests { })), ..Default::default() }, + true, ) .await; assert_eq!(result, vec![entries[2].clone()]); let result = filter_all_s3_objects_from( &client, - S3ObjectsFilterAll { + S3ObjectsFilter { attributes: Some(json!({ "non_existent_id": "1" })), ..Default::default() }, + true, ) .await; assert!(result.is_empty()); let result = filter_all_s3_objects_from( &client, - S3ObjectsFilterAll { + S3ObjectsFilter { attributes: Some(json!({ "attribute_id": "1" })), - key: Some("2".to_string()), + key: Some(Wildcard::new("2".to_string())), ..Default::default() }, + true, ) .await; assert!(result.is_empty()); let result = filter_all_s3_objects_from( &client, - S3ObjectsFilterAll { + S3ObjectsFilter { attributes: Some(json!({ "attribute_id": "3" })), - key: Some("3".to_string()), + key: Some(Wildcard::new("3".to_string())), ..Default::default() }, + true, ) .await; assert_eq!(result, vec![entries[3].clone()]); @@ -671,6 +853,265 @@ mod tests { assert_eq!(builder.to_list_count().await.unwrap(), ListCount::new(10)); } + #[test] + fn test_filter_operation() { + let operation = ListQueryBuilder::::filter_operation( + Expr::col(s3_object::Column::StorageClass), + WildcardEither::Or(StorageClass::Standard), + true, + ); + assert!(matches!( + operation, + SimpleExpr::Binary(_, BinOper::Equal, _) + )); + + let operation = ListQueryBuilder::::filter_operation( + Expr::col(s3_object::Column::StorageClass), + WildcardEither::Or(StorageClass::Standard), + false, + ); + assert!(matches!( + operation, + SimpleExpr::Binary(_, BinOper::Equal, _) + )); + + let operation = ListQueryBuilder::::filter_operation( + Expr::col(s3_object::Column::StorageClass), + WildcardEither::Wildcard::(Wildcard::new("Standar%".to_string())), + true, + ); + assert!(matches!(operation, SimpleExpr::Binary(_, BinOper::Like, _))); + + let operation = ListQueryBuilder::::filter_operation( + Expr::col(s3_object::Column::StorageClass), + WildcardEither::Wildcard::(Wildcard::new("Standar%".to_string())), + false, + ); + assert!(matches!( + operation, + SimpleExpr::Binary(_, BinOper::PgOperator(PgBinOper::ILike), _) + )); + } + + #[sqlx::test(migrator = "MIGRATOR")] + async fn test_list_s3_objects_filter_wildcard(pool: PgPool) { + let client = Client::from_pool(pool); + let entries = EntriesBuilder::default() + .with_shuffle(true) + .build(&client) + .await; + let s3_entries = entries.s3_objects.clone(); + + let result = filter_all_s3_objects_from( + &client, + S3ObjectsFilter { + event_type: Some(WildcardEither::Wildcard(Wildcard::new( + "Cr___ed".to_string(), + ))), + ..Default::default() + }, + true, + ) + .await; + assert_eq!( + result, + filter_event_type(s3_entries.clone(), EventType::Created) + ); + + let result = filter_all_s3_objects_from( + &client, + S3ObjectsFilter { + event_type: Some(WildcardEither::Wildcard(Wildcard::new( + "cr___ed".to_string(), + ))), + ..Default::default() + }, + false, + ) + .await; + assert_eq!( + result, + filter_event_type(s3_entries.clone(), EventType::Created) + ); + + let result = filter_all_s3_objects_from( + &client, + S3ObjectsFilter { + bucket: Some(Wildcard::new("0%".to_string())), + ..Default::default() + }, + false, + ) + .await; + assert_eq!(result, &s3_entries[0..2]); + } + + #[sqlx::test(migrator = "MIGRATOR")] + async fn test_list_s3_objects_wildcard_attributes(pool: PgPool) { + let client = Client::from_pool(pool); + let mut entries = EntriesBuilder::default() + .with_shuffle(true) + .build(&client) + .await; + + let test_attributes = json!({ + "nested_id": { + "attribute_id": "1" + }, + "attribute_id": "test" + }); + change_many(&client, &entries, &[0, 1], Some(test_attributes.clone())).await; + change_many( + &client, + &entries, + &(2..10).collect::>(), + Some(json!({ + "nested_id": { + "attribute_id": "test" + }, + "attribute_id": "1" + })), + ) + .await; + + // Filter with wildcard attributes. + let (objects, s3_objects) = filter_attributes( + &client, + Some(json!({ + "attribute_id": "t%" + })), + true, + ) + .await; + + entries_many(&mut entries, &[0, 1], test_attributes); + assert_eq!(objects, entries.objects[0..2].to_vec()); + assert_eq!(s3_objects, entries.s3_objects[0..2].to_vec()); + + let test_attributes = json!({ + "attribute_id": "attribute_id" + }); + change_many(&client, &entries, &[0, 1], Some(test_attributes.clone())).await; + change_many( + &client, + &entries, + &(2..10).collect::>(), + Some(json!({ + "nested_id": { + "attribute_id": "attribute_id" + } + })), + ) + .await; + + entries_many(&mut entries, &[0, 1], test_attributes); + + let (objects, s3_objects) = filter_attributes( + &client, + Some(json!({ + // This should not trigger a fetch on the nested id. + "attribute_id": "%a%" + })), + true, + ) + .await; + assert_eq!(objects, entries.objects[0..2].to_vec()); + assert_eq!(s3_objects, entries.s3_objects[0..2].to_vec()); + + let (objects, s3_objects) = filter_attributes( + &client, + Some(json!({ + // Case-insensitive should work + "attribute_id": "%A%" + })), + false, + ) + .await; + assert_eq!(objects, entries.objects[0..2].to_vec()); + assert_eq!(s3_objects, entries.s3_objects[0..2].to_vec()); + + null_attributes(&client, &entries, 0).await; + null_attributes(&client, &entries, 1).await; + + let (objects, s3_objects) = filter_attributes( + &client, + Some(json!({ + // A check is okay on null json as well. + "attribute_id": "%1%" + })), + true, + ) + .await; + + assert!(objects.is_empty()); + assert!(s3_objects.is_empty()); + } + + #[test] + fn apply_json_condition() { + let conditions = ListQueryBuilder::::json_conditions( + s3_object::Column::Attributes.into_column_ref(), + json!({ "attribute_id": "1" }), + true, + ); + assert_eq!(conditions.len(), 1); + + let operation = conditions[0].clone(); + assert!(matches!( + conditions[0].clone(), + SimpleExpr::Binary(_, BinOper::Equal, _) + )); + assert_cast_json(&operation); + + let conditions = ListQueryBuilder::::json_conditions( + s3_object::Column::Attributes.into_column_ref(), + json!({ "attribute_id": "a%" }), + true, + ); + assert_eq!(conditions.len(), 1); + + let operation = conditions[0].clone(); + assert!(matches!(operation, SimpleExpr::Binary(_, BinOper::Like, _))); + assert_cast_json(&operation); + + let conditions = ListQueryBuilder::::json_conditions( + s3_object::Column::Attributes.into_column_ref(), + json!({ "attribute_id": "a%" }), + false, + ); + assert_eq!(conditions.len(), 1); + + let operation = conditions[0].clone(); + assert!(matches!( + operation, + SimpleExpr::Binary(_, BinOper::PgOperator(PgBinOper::ILike), _) + )); + assert_cast_json(&operation); + } + + fn assert_cast_json(operation: &SimpleExpr) { + if let SimpleExpr::Binary(operation, _, _) = operation { + if let SimpleExpr::FunctionCall(call) = operation.as_ref() { + call.get_args().iter().for_each(assert_cast_json); + } else { + assert!(matches!( + operation.as_ref(), + SimpleExpr::Binary(_, BinOper::PgOperator(PgBinOper::CastJsonField), _) + )); + } + } + } + + pub(crate) fn filter_event_type( + entries: Vec, + event_type: EventType, + ) -> Vec { + entries + .into_iter() + .filter(|entry| entry.event_type == event_type) + .collect::>() + } + async fn paginate_all( builder: ListQueryBuilder<'_, C, T>, page: u64, @@ -690,12 +1131,39 @@ mod tests { .unwrap() } + async fn filter_attributes( + client: &Client, + filter: Option, + case_sensitive: bool, + ) -> (Vec, Vec) { + ( + filter_all_objects_from( + client, + ObjectsFilter { + attributes: filter.clone(), + }, + case_sensitive, + ) + .await, + filter_all_s3_objects_from( + client, + S3ObjectsFilter { + attributes: filter, + ..Default::default() + }, + case_sensitive, + ) + .await, + ) + } + async fn filter_all_objects_from( client: &Client, - filter: ObjectsFilterAll, + filter: ObjectsFilter, + case_sensitive: bool, ) -> Vec { ListQueryBuilder::<_, object::Entity>::new(client.connection_ref()) - .filter_all(filter) + .filter_all(filter, case_sensitive) .all() .await .unwrap() @@ -703,10 +1171,11 @@ mod tests { async fn filter_all_s3_objects_from( client: &Client, - filter: S3ObjectsFilterAll, + filter: S3ObjectsFilter, + case_sensitive: bool, ) -> Vec { ListQueryBuilder::<_, s3_object::Entity>::new(client.connection_ref()) - .filter_all(filter) + .filter_all(filter, case_sensitive) .all() .await .unwrap() diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/update.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/update.rs index 1856f268a..58fd98048 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/update.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/update.rs @@ -4,22 +4,21 @@ use json_patch::patch; use sea_orm::prelude::{Expr, Json}; use sea_orm::sea_query::{ - Alias, Asterisk, CommonTableExpression, PostgresQueryBuilder, Query, SelectStatement, - SimpleExpr, WithClause, WithQuery, + Alias, Asterisk, CommonTableExpression, Query, SelectStatement, SimpleExpr, WithClause, + WithQuery, }; use sea_orm::{ ColumnTrait, ConnectionTrait, EntityTrait, FromQueryResult, Iterable, ModelTrait, QueryFilter, QueryTrait, StatementBuilder, Value, }; use serde_json::json; -use tracing::trace; use uuid::Uuid; use crate::database::entities::{object, s3_object}; use crate::error::Error::{InvalidQuery, QueryError}; use crate::error::Result; use crate::queries::list::ListQueryBuilder; -use crate::routes::filtering::{ObjectsFilterAll, S3ObjectsFilterAll}; +use crate::routes::filter::{ObjectsFilter, S3ObjectsFilter}; use crate::routes::update::PatchBody; /// A query builder for list operations. @@ -58,8 +57,8 @@ where } /// Filter records by all fields in the filter variable. - pub fn filter_all(mut self, filter: ObjectsFilterAll) -> Self { - self.select_to_update = self.select_to_update.filter_all(filter); + pub fn filter_all(mut self, filter: ObjectsFilter, case_sensitive: bool) -> Self { + self.select_to_update = self.select_to_update.filter_all(filter, case_sensitive); self.trace_query("filter_all"); @@ -97,8 +96,8 @@ where } /// Filter records by all fields in the filter variable. - pub fn filter_all(mut self, filter: S3ObjectsFilterAll) -> Self { - self.select_to_update = self.select_to_update.filter_all(filter); + pub fn filter_all(mut self, filter: S3ObjectsFilter, case_sensitive: bool) -> Self { + self.select_to_update = self.select_to_update.filter_all(filter, case_sensitive); self.trace_query("filter_all"); @@ -293,7 +292,6 @@ where fn trace_query(&self, message: &str) { self.select_to_update.trace_query(message); - trace!("{message}: {}", self.update.to_string(PostgresQueryBuilder)); } } @@ -303,6 +301,7 @@ pub(crate) mod tests { use std::ops::{Index, Range}; use crate::queries::{Entries, EntriesBuilder}; + use crate::routes::filter::wildcard::Wildcard; use sea_orm::{ActiveModelTrait, IntoActiveModel}; use sea_orm::{DatabaseConnection, Set}; use serde_json::json; @@ -311,17 +310,22 @@ pub(crate) mod tests { use crate::database::aws::migration::tests::MIGRATOR; - use crate::database::Client; - use super::*; + use crate::database::Client; + use crate::routes::filter::wildcard::WildcardEither; #[sqlx::test(migrator = "MIGRATOR")] async fn update_attributes_replace(pool: PgPool) { let client = Client::from_pool(pool); let mut entries = EntriesBuilder::default().build(&client).await; - change_attributes(&client, &entries, 0, Some(json!({"attribute_id": "1"}))).await; - change_attributes(&client, &entries, 1, Some(json!({"attribute_id": "1"}))).await; + change_many( + &client, + &entries, + &[0, 1], + Some(json!({"attribute_id": "1"})), + ) + .await; let patch = json!([ { "op": "test", "path": "/attribute_id", "value": "1" }, @@ -330,8 +334,11 @@ pub(crate) mod tests { let results = test_update_with_attribute_id(&client, patch).await; - change_attribute_entries(&mut entries, 0, json!({"attribute_id": "attribute_id"})).await; - change_attribute_entries(&mut entries, 1, json!({"attribute_id": "attribute_id"})).await; + entries_many( + &mut entries, + &[0, 1], + json!({"attribute_id": "attribute_id"}), + ); assert_contains(&results.0, &results.1, &entries, 0..2); assert_correct_records(&client, entries).await; @@ -342,8 +349,13 @@ pub(crate) mod tests { let client = Client::from_pool(pool); let mut entries = EntriesBuilder::default().build(&client).await; - change_attributes(&client, &entries, 0, Some(json!({"attribute_id": "1"}))).await; - change_attributes(&client, &entries, 1, Some(json!({"attribute_id": "1"}))).await; + change_many( + &client, + &entries, + &[0, 1], + Some(json!({"attribute_id": "1"})), + ) + .await; let patch = json!([ { "op": "add", "path": "/another_attribute", "value": "1" }, @@ -351,19 +363,48 @@ pub(crate) mod tests { let results = test_update_with_attribute_id(&client, patch).await; - change_attribute_entries( + entries_many( &mut entries, - 0, + &[0, 1], json!({"attribute_id": "1", "another_attribute": "1"}), + ); + + assert_contains(&results.0, &results.1, &entries, 0..2); + assert_correct_records(&client, entries).await; + } + + #[sqlx::test(migrator = "MIGRATOR")] + async fn update_attributes_add_wildcard(pool: PgPool) { + let client = Client::from_pool(pool); + let mut entries = EntriesBuilder::default().build(&client).await; + + change_many( + &client, + &entries, + &[0, 1], + Some(json!({"attribute_id": "attribute_id"})), ) .await; - change_attribute_entries( - &mut entries, - 1, - json!({"attribute_id": "1", "another_attribute": "1"}), + + let patch = json!([ + { "op": "add", "path": "/another_attribute", "value": "1" }, + ]); + + let results = test_update_attributes( + &client, + patch, + Some(json!({ + "attribute_id": "%a%" + })), ) .await; + entries_many( + &mut entries, + &[0, 1], + json!({"attribute_id": "attribute_id", "another_attribute": "1"}), + ); + assert_contains(&results.0, &results.1, &entries, 0..2); assert_correct_records(&client, entries).await; } @@ -373,8 +414,13 @@ pub(crate) mod tests { let client = Client::from_pool(pool); let mut entries = EntriesBuilder::default().build(&client).await; - change_attributes(&client, &entries, 0, Some(json!({"attribute_id": "1"}))).await; - change_attributes(&client, &entries, 1, Some(json!({"attribute_id": "1"}))).await; + change_many( + &client, + &entries, + &[0, 1], + Some(json!({"attribute_id": "1"})), + ) + .await; let patch = json!([ { "op": "add", "path": "/another_attribute", "value": "1" }, @@ -382,12 +428,15 @@ pub(crate) mod tests { let results = UpdateQueryBuilder::<_, s3_object::Entity>::new(client.connection_ref()) .current_state() - .filter_all(S3ObjectsFilterAll { - attributes: Some(json!({ - "attribute_id": "1" - })), - ..Default::default() - }) + .filter_all( + S3ObjectsFilter { + attributes: Some(json!({ + "attribute_id": "1" + })), + ..Default::default() + }, + true, + ) .update_s3_object_attributes(PatchBody::new(from_value(patch).unwrap())) .await .unwrap() @@ -406,13 +455,86 @@ pub(crate) mod tests { assert_correct_records(&client, entries).await; } + #[sqlx::test(migrator = "MIGRATOR")] + async fn update_s3_attributes_wildcard_like(pool: PgPool) { + let client = Client::from_pool(pool); + let mut entries = EntriesBuilder::default().build(&client).await; + + change_many( + &client, + &entries, + &[0, 2, 4, 6, 8], + Some(json!({"attribute_id": "1"})), + ) + .await; + + let patch = json!([ + { "op": "add", "path": "/another_attribute", "value": "1" }, + ]); + + let results = UpdateQueryBuilder::<_, s3_object::Entity>::new(client.connection_ref()) + .current_state() + .filter_all( + S3ObjectsFilter { + event_type: Some(WildcardEither::Wildcard(Wildcard::new("C%".to_string()))), + ..Default::default() + }, + true, + ) + .update_s3_object_attributes(PatchBody::new(from_value(patch).unwrap())) + .await + .unwrap() + .all() + .await + .unwrap(); + + assert_wildcard_update(&mut entries, &results); + assert_correct_records(&client, entries).await; + } + + #[sqlx::test(migrator = "MIGRATOR")] + async fn update_s3_attributes_wildcard_ilike(pool: PgPool) { + let client = Client::from_pool(pool); + let mut entries = EntriesBuilder::default().build(&client).await; + + change_many( + &client, + &entries, + &[0, 2, 4, 6, 8], + Some(json!({"attribute_id": "1"})), + ) + .await; + + let patch = json!([ + { "op": "add", "path": "/another_attribute", "value": "1" }, + ]); + + let results = UpdateQueryBuilder::<_, s3_object::Entity>::new(client.connection_ref()) + .current_state() + .filter_all( + S3ObjectsFilter { + event_type: Some(WildcardEither::Wildcard(Wildcard::new("c%".to_string()))), + ..Default::default() + }, + false, + ) + .update_s3_object_attributes(PatchBody::new(from_value(patch).unwrap())) + .await + .unwrap() + .all() + .await + .unwrap(); + + assert_wildcard_update(&mut entries, &results); + assert_correct_records(&client, entries).await; + } + #[sqlx::test(migrator = "MIGRATOR")] async fn update_attributes_add_from_null_json(pool: PgPool) { let client = Client::from_pool(pool); let mut entries = EntriesBuilder::default().build(&client).await; - change_attributes(&client, &entries, 0, Some(Value::Null)).await; - change_attributes(&client, &entries, 1, Some(Value::Null)).await; + change_many(&client, &entries, &[0, 1], Some(Value::Null)).await; let patch = json!([ { "op": "add", "path": "/another_attribute", "value": "1" }, @@ -420,8 +542,7 @@ pub(crate) mod tests { let results = test_update_attributes(&client, patch, Some(Value::Null)).await; - change_attribute_entries(&mut entries, 0, json!({"another_attribute": "1"})).await; - change_attribute_entries(&mut entries, 1, json!({"another_attribute": "1"})).await; + entries_many(&mut entries, &[0, 1], json!({"another_attribute": "1"})); assert_contains(&results.0, &results.1, &entries, 0..2); assert_correct_records(&client, entries).await; @@ -446,7 +567,7 @@ pub(crate) mod tests { ) .await; - change_attribute_entries(&mut entries, 0, json!({"another_attribute": "1"})).await; + change_attribute_entries(&mut entries, 0, json!({"another_attribute": "1"})); assert_contains(&results.0, &results.1, &entries, 0..1); assert_correct_records(&client, entries).await; @@ -457,8 +578,13 @@ pub(crate) mod tests { let client = Client::from_pool(pool); let mut entries = EntriesBuilder::default().build(&client).await; - change_attributes(&client, &entries, 0, Some(json!({"attribute_id": "1"}))).await; - change_attributes(&client, &entries, 1, Some(json!({"attribute_id": "1"}))).await; + change_many( + &client, + &entries, + &[0, 1], + Some(json!({"attribute_id": "1"})), + ) + .await; let patch = json!([ { "op": "remove", "path": "/attribute_id" }, @@ -466,8 +592,7 @@ pub(crate) mod tests { let results = test_update_with_attribute_id(&client, patch).await; - change_attribute_entries(&mut entries, 0, json!({})).await; - change_attribute_entries(&mut entries, 1, json!({})).await; + entries_many(&mut entries, &[0, 1], json!({})); assert_contains(&results.0, &results.1, &entries, 0..2); assert_correct_records(&client, entries).await; @@ -478,8 +603,13 @@ pub(crate) mod tests { let client = Client::from_pool(pool); let mut entries = EntriesBuilder::default().build(&client).await; - change_attributes(&client, &entries, 0, Some(json!({"attribute_id": "2"}))).await; - change_attributes(&client, &entries, 1, Some(json!({"attribute_id": "2"}))).await; + change_many( + &client, + &entries, + &[0, 1], + Some(json!({"attribute_id": "2"})), + ) + .await; let patch = json!([ { "op": "remove", "path": "/attribute_id" }, @@ -487,8 +617,7 @@ pub(crate) mod tests { let results = test_update_with_attribute_id(&client, patch).await; - change_attribute_entries(&mut entries, 0, json!({"attribute_id": "2"})).await; - change_attribute_entries(&mut entries, 1, json!({"attribute_id": "2"})).await; + entries_many(&mut entries, &[0, 1], json!({"attribute_id": "2"})); assert!(results.0.is_empty()); assert!(results.1.is_empty()); @@ -500,8 +629,13 @@ pub(crate) mod tests { let client = Client::from_pool(pool); let mut entries = EntriesBuilder::default().build(&client).await; - change_attributes(&client, &entries, 0, Some(json!({"attribute_id": "1"}))).await; - change_attributes(&client, &entries, 1, Some(json!({"attribute_id": "1"}))).await; + change_many( + &client, + &entries, + &[0, 1], + Some(json!({"attribute_id": "1"})), + ) + .await; let patch = json!([ { "op": "replace", "path": "/attribute_id", "value": "attribute_id" }, @@ -529,8 +663,7 @@ pub(crate) mod tests { assert!(matches!(s3_objects, Err(InvalidQuery(_)))); // Nothing should be updated here. - change_attribute_entries(&mut entries, 0, json!({"attribute_id": "1"})).await; - change_attribute_entries(&mut entries, 1, json!({"attribute_id": "1"})).await; + entries_many(&mut entries, &[0, 1], json!({"attribute_id": "1"})); assert_correct_records(&client, entries).await; } @@ -554,7 +687,7 @@ pub(crate) mod tests { ) .await; - change_attribute_entries(&mut entries, 0, json!({"attribute_id": "attribute_id"})).await; + change_attribute_entries(&mut entries, 0, json!({"attribute_id": "attribute_id"})); assert_contains(&result.0, &result.1, &entries, 0..1); assert_correct_records(&client, entries).await; @@ -594,9 +727,11 @@ pub(crate) mod tests { ) .await; - change_attribute_entries(&mut entries, 0, json!({"attribute_id": "attribute_id"})).await; - change_attribute_entries(&mut entries, 1, json!({"attribute_id": "attribute_id"})).await; - change_attribute_entries(&mut entries, 2, json!({"attribute_id": "attribute_id"})).await; + entries_many( + &mut entries, + &[0, 1, 2], + json!({"attribute_id": "attribute_id"}), + ); assert_model_contains(&results_objects.0, &entries.objects, 0..3); assert_model_contains(&results_s3_objects.1, &entries.s3_objects, 0..3); @@ -609,7 +744,7 @@ pub(crate) mod tests { attributes: Option, ) -> Result> { UpdateQueryBuilder::<_, object::Entity>::new(client.connection_ref()) - .filter_all(ObjectsFilterAll { attributes }) + .filter_all(ObjectsFilter { attributes }, true) .update_object_attributes(PatchBody::new(from_value(patch).unwrap())) .await } @@ -620,10 +755,13 @@ pub(crate) mod tests { attributes: Option, ) -> Result> { UpdateQueryBuilder::<_, s3_object::Entity>::new(client.connection_ref()) - .filter_all(S3ObjectsFilterAll { - attributes, - ..Default::default() - }) + .filter_all( + S3ObjectsFilter { + attributes, + ..Default::default() + }, + true, + ) .update_s3_object_attributes(PatchBody::new(from_value(patch).unwrap())) .await } @@ -689,6 +827,16 @@ pub(crate) mod tests { .await } + pub(crate) fn assert_wildcard_update(entries: &mut Entries, results: &[s3_object::Model]) { + for i in [0, 2, 4, 6, 8] { + // Only the created event should be updated. + entries.s3_objects[i].attributes = + Some(json!({"attribute_id": "1", "another_attribute": "1"})); + entries.objects[i].attributes = Some(json!({"attribute_id": "1"})); + assert_model_contains(&[results[i / 2].clone()], &entries.s3_objects, i..i + 1); + } + } + /// Make attributes null for an entry. pub(crate) async fn null_attributes(client: &Client, entries: &Entries, entry: usize) { change_attributes(client, entries, entry, None).await; @@ -705,6 +853,18 @@ pub(crate) mod tests { change_s3_object_attributes(client, entries, entry, value).await; } + /// Change multiple attributes in the database. + pub(crate) async fn change_many( + client: &Client, + entries: &Entries, + indices: &[usize], + value: Option, + ) { + for i in indices { + change_attributes(client, entries, *i, value.clone()).await; + } + } + async fn change_s3_object_attributes( client: &Client, entries: &Entries, @@ -728,16 +888,19 @@ pub(crate) mod tests { model.update(client.connection_ref()).await.unwrap(); } - /// Change attributes in the database. - pub(crate) async fn change_attribute_entries( - entries: &mut Entries, - entry: usize, - value: Value, - ) { + /// Change attributes in the entries. + pub(crate) fn change_attribute_entries(entries: &mut Entries, entry: usize, value: Value) { entries.s3_objects[entry].attributes = Some(value.clone()); entries.objects[entry].attributes = Some(value); } + /// Change multiple attributes in the entries. + pub(crate) fn entries_many(entries: &mut Entries, indices: &[usize], value: Value) { + for i in indices { + change_attribute_entries(entries, *i, value.clone()); + } + } + pub(crate) fn assert_model_contains(objects: &[M], contains: &[M], range: Range) where M: Eq + PartialEq, diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/error.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/error.rs index bb31d41e9..66731d0a1 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/error.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/error.rs @@ -36,7 +36,7 @@ pub enum ErrorStatusCode { #[response( status = NOT_FOUND, description = "the resource or route could not be found", - example = json!({"message": "expected some value for id: `00000000-0000-0000-0000-000000000000`"}), + example = json!({"message": "expected record for id: `00000000-0000-0000-0000-000000000000`"}), )] NotFound(ErrorResponse), #[response( diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/filtering.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/filter/mod.rs similarity index 61% rename from lib/workload/stateless/stacks/filemanager/filemanager/src/routes/filtering.rs rename to lib/workload/stateless/stacks/filemanager/filemanager/src/routes/filter/mod.rs index 9bfbfbbb5..1a5630759 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/filtering.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/filter/mod.rs @@ -1,48 +1,51 @@ -//! API filtering related query logic. +//! Routing logic for query filtering. //! +pub mod wildcard; + use crate::database::entities::sea_orm_active_enums::{EventType, StorageClass}; +use crate::routes::filter::wildcard::{Wildcard, WildcardEither}; use sea_orm::prelude::{DateTimeWithTimeZone, Json}; use serde::{Deserialize, Serialize}; use utoipa::{IntoParams, ToSchema}; /// The available fields to filter `s3_object` queries by. Each query parameter represents /// an `and` clause in the SQL statement. Nested query string style syntax is supported on -/// JSON attributes. +/// JSON attributes. Wildcards are supported on some of the fields. #[derive(Serialize, Deserialize, Debug, Default, IntoParams, ToSchema)] #[serde(default)] #[into_params(parameter_in = Query)] -pub struct S3ObjectsFilterAll { - #[param(required = false)] - /// Query by event type. - pub(crate) event_type: Option, +pub struct S3ObjectsFilter { + #[param(required = false, value_type = Wildcard)] + /// Query by event type. Supports wildcards. + pub(crate) event_type: Option>, #[param(required = false)] - /// Query by bucket. - pub(crate) bucket: Option, + /// Query by bucket. Supports wildcards. + pub(crate) bucket: Option, #[param(required = false)] - /// Query by key. - pub(crate) key: Option, + /// Query by key. Supports wildcards. + pub(crate) key: Option, #[param(required = false)] - /// Query by version_id. - pub(crate) version_id: Option, - #[param(required = false)] - /// Query by date. - pub(crate) date: Option, + /// Query by version_id. Supports wildcards. + pub(crate) version_id: Option, + #[param(required = false, value_type = Wildcard)] + /// Query by date. Supports wildcards. + pub(crate) date: Option>, #[param(required = false)] /// Query by size. pub(crate) size: Option, #[param(required = false)] /// Query by the sha256 checksum. pub(crate) sha256: Option, - #[param(required = false)] - /// Query by the last modified date. - pub(crate) last_modified_date: Option, + #[param(required = false, value_type = Wildcard)] + /// Query by the last modified date. Supports wildcards. + pub(crate) last_modified_date: Option>, #[param(required = false)] /// Query by the e_tag. pub(crate) e_tag: Option, - #[param(required = false)] - /// Query by the storage class. - pub(crate) storage_class: Option, + #[param(required = false, value_type = Wildcard)] + /// Query by the storage class. Supports wildcards. + pub(crate) storage_class: Option>, #[param(required = false)] /// Query by the object delete marker. pub(crate) is_delete_marker: Option, @@ -51,7 +54,7 @@ pub struct S3ObjectsFilterAll { /// fields, e.g. `attributes[attribute_id]=...`. This only deserializes /// into string fields, and does not support other JSON types. E.g. /// `attributes[attribute_id]=1` converts to `{ "attribute_id" = "1" }` - /// rather than `{ "attribute_id" = 1 }`. + /// rather than `{ "attribute_id" = 1 }`. Supports wildcards. pub(crate) attributes: Option, } @@ -61,12 +64,12 @@ pub struct S3ObjectsFilterAll { #[derive(Serialize, Deserialize, Debug, Default, IntoParams, ToSchema)] #[serde(default)] #[into_params(parameter_in = Query)] -pub struct ObjectsFilterAll { +pub struct ObjectsFilter { #[param(required = false)] /// Query by JSON attributes. Supports nested syntax to access inner /// fields, e.g. `attributes[attribute_id]=...`. This only deserializes /// into string fields, and does not support other JSON types. E.g. /// `attributes[attribute_id]=1` converts to `{ "attribute_id" = "1" }` - /// rather than `{ "attribute_id" = 1 }`. + /// rather than `{ "attribute_id" = 1 }`. Supports wildcards. pub(crate) attributes: Option, } diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/filter/wildcard.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/filter/wildcard.rs new file mode 100644 index 000000000..bdcaaf254 --- /dev/null +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/filter/wildcard.rs @@ -0,0 +1,148 @@ +//! Wildcard filtering logic. +//! + +use serde::{Deserialize, Serialize}; +use utoipa::ToSchema; + +/// An enum which deserializes into a concrete type or a wildcard. This is used for better +/// type support when non-string filter parameters such as `StorageClass` or `EventType`. +#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)] +#[serde(untagged)] +pub enum WildcardEither { + Or(T), + Wildcard(Wildcard), +} + +impl WildcardEither { + /// Create an or variant. + pub fn or(value: T) -> WildcardEither { + Self::Or(value) + } + + /// Create a wildcard variant. + pub fn wildcard(wildcard: String) -> WildcardEither { + Self::Wildcard(Wildcard::new(wildcard)) + } + + /// Map the function to the type if this is an `Or` variant. + pub fn map(self, f: F) -> WildcardEither + where + F: FnOnce(T) -> U, + { + match self { + Self::Or(or) => WildcardEither::Or(f(or)), + Self::Wildcard(wildcard) => WildcardEither::Wildcard(wildcard), + } + } +} + +/// A wildcard type represents a filter to match arbitrary characters. Use '%' for multiple characters +/// and '_' for a single character. Use '\\' to escape these characters. Wildcards are converted to +/// postgres `like` or `ilike` queries. +#[derive(Serialize, Deserialize, Debug, Default, ToSchema, Eq, PartialEq)] +#[serde(default)] +pub struct Wildcard(pub(crate) String); + +impl Wildcard { + /// Create a new wildcard. + pub fn new(wildcard: String) -> Self { + Self(wildcard) + } + + /// Get the inner string value. + pub fn into_inner(self) -> String { + self.0 + } + + /// Check whether there are wildcard matches contained in this wildcard. This is useful to + /// convert the wildcard to a postgres `like` statement, because wildcards without any + /// `%` or `_` don't need to be run through `like` and can instead be used in an equality + /// comparison + pub fn contains_wildcard(&self) -> bool { + let mut chars = self.0.chars().peekable(); + + while let Some(char) = chars.next() { + // If there is a backslash, then the next character can be either '%' or '_' and not + // pass this check. + if char == '\\' { + let peek = chars.peek(); + if let Some(next_char) = peek { + if *next_char == '%' || *next_char == '_' { + // Skip the next character as we have just processed it. + chars.next(); + continue; + } + } + } + + // This will result in a wildcard match as it is not escaped. + if char == '%' || char == '_' { + return true; + } + } + + false + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::database::entities::sea_orm_active_enums::{EventType, StorageClass}; + use sea_orm::prelude::DateTimeWithTimeZone; + use serde_json::json; + + #[test] + fn deserialize_wildcard_either() { + let wildcard: WildcardEither = serde_json::from_value(json!("Created")).unwrap(); + assert_eq!(wildcard, WildcardEither::Or(EventType::Created)); + let wildcard: WildcardEither = + serde_json::from_value(json!("1970-01-01T00:00:00.000Z")).unwrap(); + assert_eq!( + wildcard, + WildcardEither::Or(DateTimeWithTimeZone::default()) + ); + let wildcard: WildcardEither = + serde_json::from_value(json!("Standard")).unwrap(); + assert_eq!(wildcard, WildcardEither::Or(StorageClass::Standard)); + + let wildcard: WildcardEither = serde_json::from_value(json!("Create%")).unwrap(); + assert_eq!( + wildcard, + WildcardEither::Wildcard(Wildcard::new("Create%".to_string())) + ); + let wildcard: WildcardEither = + serde_json::from_value(json!("1970-01-01%")).unwrap(); + assert_eq!( + wildcard, + WildcardEither::Wildcard(Wildcard::new("1970-01-01%".to_string())) + ); + let wildcard: WildcardEither = + serde_json::from_value(json!("Standar%")).unwrap(); + assert_eq!( + wildcard, + WildcardEither::Wildcard(Wildcard::new("Standar%".to_string())) + ); + } + + #[test] + fn wildcard_deserialize() { + assert!(!Wildcard::new(r#"test"#.to_string()).contains_wildcard()); + assert!(!Wildcard::new(r#"tes\n"#.to_string()).contains_wildcard()); + + assert!(Wildcard::new(r#"t%st"#.to_string()).contains_wildcard()); + assert!(Wildcard::new(r#"t_st"#.to_string()).contains_wildcard()); + + assert!(!Wildcard::new(r#"t\%st"#.to_string()).contains_wildcard()); + assert!(!Wildcard::new(r#"t\_st"#.to_string()).contains_wildcard()); + assert!(!Wildcard::new(r#"t\\st"#.to_string()).contains_wildcard()); + + assert!(Wildcard::new(r#"te%%"#.to_string()).contains_wildcard()); + assert!(Wildcard::new(r#"te__"#.to_string()).contains_wildcard()); + assert!(!Wildcard::new(r#"te\\\\"#.to_string()).contains_wildcard()); + + assert!(Wildcard::new(r#"te\%%"#.to_string()).contains_wildcard()); + assert!(Wildcard::new(r#"te\__"#.to_string()).contains_wildcard()); + assert!(!Wildcard::new(r#"tes\\"#.to_string()).contains_wildcard()); + } +} diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/list.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/list.rs index f101159c4..c1acaa2fe 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/list.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/list.rs @@ -7,7 +7,7 @@ use crate::database::entities::{object, s3_object}; use crate::error::Result; use crate::queries::list::ListQueryBuilder; use crate::routes::error::ErrorStatusCode; -use crate::routes::filtering::{ObjectsFilterAll, S3ObjectsFilterAll}; +use crate::routes::filter::{ObjectsFilter, S3ObjectsFilter}; use crate::routes::pagination::Pagination; use crate::routes::AppState; use axum::extract::{Query, State}; @@ -65,25 +65,51 @@ impl ListResponse { } } +/// Params for wildcard requests. +#[derive(Debug, Deserialize, Default, IntoParams)] +#[serde(default)] +#[into_params(parameter_in = Query)] +pub struct WildcardParams { + /// The case sensitivity when using filter operations with a wildcard. + /// Setting this true means that an SQL `like` statement is used, and false + /// means `ilike` is used. + #[serde(default = "default_case_sensitivity")] + #[param(nullable, default = true)] + pub(crate) case_sensitive: bool, +} + +impl WildcardParams { + /// Create new wildcard params. + pub fn new(case_sensitive: bool) -> Self { + Self { case_sensitive } + } + + /// Get the case sensitivity. + pub fn case_sensitive(&self) -> bool { + self.case_sensitive + } +} + /// List all objects according to the parameters. #[utoipa::path( get, path = "/objects", responses( - (status = OK, description = "The collection of objects", body = Vec), + (status = OK, description = "The collection of objects", body = ListResponseObject), ErrorStatusCode, ), - params(Pagination, ObjectsFilterAll), + params(Pagination, WildcardParams, ObjectsFilter), context_path = "/api/v1", tag = "list", )] pub async fn list_objects( state: State, Query(pagination): Query, - QsQuery(filter_all): QsQuery, + Query(wildcard): Query, + QsQuery(filter_all): QsQuery, ) -> Result>> { let response = ListQueryBuilder::<_, object::Entity>::new(state.client.connection_ref()) - .filter_all(filter_all) + .filter_all(filter_all, wildcard.case_sensitive()) .paginate_to_list_response(pagination) .await?; @@ -98,22 +124,28 @@ pub async fn list_objects( (status = OK, description = "The count of objects", body = ListCount), ErrorStatusCode, ), - params(ObjectsFilterAll), + params(WildcardParams, ObjectsFilter), context_path = "/api/v1", tag = "list", )] pub async fn count_objects( state: State, - QsQuery(filter_all): QsQuery, + Query(wildcard): Query, + QsQuery(filter_all): QsQuery, ) -> Result> { let response = ListQueryBuilder::<_, object::Entity>::new(state.client.connection_ref()) - .filter_all(filter_all) + .filter_all(filter_all, wildcard.case_sensitive()) .to_list_count() .await?; Ok(Json(response)) } +/// The default case sensitivity for s3 object filter queries. +pub fn default_case_sensitivity() -> bool { + true +} + /// Params for a list s3 objects request. #[derive(Debug, Deserialize, Default, IntoParams)] #[serde(default)] @@ -148,21 +180,22 @@ impl ListS3ObjectsParams { get, path = "/s3_objects", responses( - (status = OK, description = "The collection of s3_objects", body = Vec), + (status = OK, description = "The collection of s3_objects", body = ListResponseS3Object), ErrorStatusCode, ), - params(Pagination, ListS3ObjectsParams, S3ObjectsFilterAll), + params(Pagination, WildcardParams, ListS3ObjectsParams, S3ObjectsFilter), context_path = "/api/v1", tag = "list", )] pub async fn list_s3_objects( state: State, Query(pagination): Query, + Query(wildcard): Query, Query(list): Query, - QsQuery(filter_all): QsQuery, + QsQuery(filter_all): QsQuery, ) -> Result>> { let mut response = ListQueryBuilder::<_, s3_object::Entity>::new(state.client.connection_ref()) - .filter_all(filter_all); + .filter_all(filter_all, wildcard.case_sensitive()); if list.current_state { response = response.current_state(); @@ -179,17 +212,18 @@ pub async fn list_s3_objects( (status = OK, description = "The count of s3 objects", body = ListCount), ErrorStatusCode, ), - params(ListS3ObjectsParams, S3ObjectsFilterAll), + params(WildcardParams, ListS3ObjectsParams, S3ObjectsFilter), context_path = "/api/v1", tag = "list", )] pub async fn count_s3_objects( state: State, + Query(wildcard): Query, Query(list): Query, - QsQuery(filter_all): QsQuery, + QsQuery(filter_all): QsQuery, ) -> Result> { let mut response = ListQueryBuilder::<_, s3_object::Entity>::new(state.client.connection_ref()) - .filter_all(filter_all); + .filter_all(filter_all, wildcard.case_sensitive()); if list.current_state { response = response.current_state(); @@ -214,12 +248,15 @@ pub(crate) mod tests { use axum::http::header::CONTENT_TYPE; use axum::http::{Method, Request, StatusCode}; use serde::de::DeserializeOwned; - use serde_json::from_slice; + use serde_json::{from_slice, json}; use sqlx::PgPool; use tower::ServiceExt; use crate::database::aws::migration::tests::MIGRATOR; use crate::database::entities::sea_orm_active_enums::EventType; + use crate::queries::list::tests::filter_event_type; + use crate::queries::update::tests::change_many; + use crate::queries::update::tests::{assert_contains, entries_many}; use crate::queries::EntriesBuilder; use crate::routes::api_router; @@ -327,10 +364,7 @@ pub(crate) mod tests { assert_eq!(result.results().len(), 5); assert_eq!( result.results, - entries - .into_iter() - .filter(|entry| entry.event_type == EventType::Deleted) - .collect::>() + filter_event_type(entries, EventType::Deleted) ); } @@ -384,6 +418,51 @@ pub(crate) mod tests { assert_eq!(result.results, vec![entries[1].clone()]); } + #[sqlx::test(migrator = "MIGRATOR")] + async fn list_objects_filter_attributes(pool: PgPool) { + let state = AppState::from_pool(pool); + let mut entries = EntriesBuilder::default().build(state.client()).await; + + change_many( + state.client(), + &entries, + &[0, 1], + Some(json!({"attribute_id": "attribute_id"})), + ) + .await; + + entries_many( + &mut entries, + &[0, 1], + json!({"attribute_id": "attribute_id"}), + ); + + let s3_objects: ListResponse = + response_from_get(state.clone(), "/s3_objects?attributes[attribute_id]=%a%").await; + let objects: ListResponse = + response_from_get(state.clone(), "/objects?attributes[attribute_id]=%a%").await; + assert_contains(&objects.results, &s3_objects.results, &entries, 0..2); + + let s3_objects: ListResponse = + response_from_get(state.clone(), "/s3_objects?attributes[attribute_id]=%A%").await; + let objects: ListResponse = + response_from_get(state.clone(), "/objects?attributes[attribute_id]=%A%").await; + assert!(s3_objects.results.is_empty()); + assert!(objects.results.is_empty()); + + let s3_objects: ListResponse = response_from_get( + state.clone(), + "/s3_objects?attributes[attribute_id]=%A%&case_sensitive=false", + ) + .await; + let objects: ListResponse = response_from_get( + state.clone(), + "/objects?attributes[attribute_id]=%A%&case_sensitive=false", + ) + .await; + assert_contains(&objects.results, &s3_objects.results, &entries, 0..2); + } + #[sqlx::test(migrator = "MIGRATOR")] async fn count_objects_api(pool: PgPool) { let state = AppState::from_pool(pool); diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/mod.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/mod.rs index 5628845a0..4a599e17b 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/mod.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/mod.rs @@ -17,7 +17,7 @@ use crate::routes::openapi::swagger_ui; use crate::routes::update::update_router; pub mod error; -pub mod filtering; +pub mod filter; pub mod get; pub mod ingest; pub mod list; diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/openapi.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/openapi.rs index e092ccff8..ea3283cbf 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/openapi.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/openapi.rs @@ -12,6 +12,7 @@ use crate::database::entities::s3_object::Model as FileS3Object; use crate::database::entities::sea_orm_active_enums::EventType; use crate::database::entities::sea_orm_active_enums::StorageClass; use crate::routes::error::ErrorResponse; +use crate::routes::filter::wildcard::Wildcard; use crate::routes::get::*; use crate::routes::ingest::*; use crate::routes::list::*; @@ -53,11 +54,12 @@ pub struct Json(pub Value); ListCount, IngestCount, DateTimeWithTimeZone, + Wildcard, Json, ListResponseObject, ListResponseS3Object, PatchBody, - Patch + Patch, ) ), modifiers(&SecurityAddon), diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/update.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/update.rs index ee947d039..907769d96 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/update.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/update.rs @@ -14,8 +14,8 @@ use crate::error::Error::ExpectedSomeValue; use crate::error::Result; use crate::queries::update::UpdateQueryBuilder; use crate::routes::error::ErrorStatusCode; -use crate::routes::filtering::{ObjectsFilterAll, S3ObjectsFilterAll}; -use crate::routes::list::ListS3ObjectsParams; +use crate::routes::filter::{ObjectsFilter, S3ObjectsFilter}; +use crate::routes::list::{ListS3ObjectsParams, WildcardParams}; use crate::routes::AppState; /// The attributes to update for the request. This updates attributes according to JSON patch. @@ -110,20 +110,21 @@ pub async fn update_object_attributes( ), ErrorStatusCode, ), - params(ObjectsFilterAll), + params(WildcardParams, ObjectsFilter), request_body = PatchBody, context_path = "/api/v1", tag = "update", )] pub async fn update_object_collection_attributes( state: State, - QsQuery(filter_all): QsQuery, + Query(wildcard): Query, + QsQuery(filter_all): QsQuery, Json(patch): Json, ) -> Result>> { let txn = state.client().connection_ref().begin().await?; let results = UpdateQueryBuilder::<_, object::Entity>::new(&txn) - .filter_all(filter_all) + .filter_all(filter_all, wildcard.case_sensitive()) .update_object_attributes(patch) .await? .all() @@ -183,20 +184,22 @@ pub async fn update_s3_object_attributes( ), ErrorStatusCode, ), - params(ListS3ObjectsParams, ObjectsFilterAll), + params(WildcardParams, ListS3ObjectsParams, ObjectsFilter), request_body = PatchBody, context_path = "/api/v1", tag = "update", )] pub async fn update_s3_object_collection_attributes( state: State, + Query(wildcard): Query, Query(list): Query, - QsQuery(filter_all): QsQuery, + QsQuery(filter_all): QsQuery, Json(patch): Json, ) -> Result>> { let txn = state.client().connection_ref().begin().await?; - let mut results = UpdateQueryBuilder::<_, s3_object::Entity>::new(&txn).filter_all(filter_all); + let mut results = UpdateQueryBuilder::<_, s3_object::Entity>::new(&txn) + .filter_all(filter_all, wildcard.case_sensitive()); if list.current_state() { results = results.current_state(); @@ -224,6 +227,7 @@ pub fn update_router() -> Router { #[cfg(test)] mod tests { + use crate::queries::update::tests::{assert_contains, entries_many}; use axum::body::Body; use axum::http::{Method, StatusCode}; use serde_json::json; @@ -233,7 +237,8 @@ mod tests { use super::*; use crate::queries::update::tests::{ - assert_correct_records, assert_model_contains, change_attribute_entries, change_attributes, + assert_correct_records, assert_model_contains, assert_wildcard_update, + change_attribute_entries, change_attributes, change_many, }; use crate::queries::EntriesBuilder; use crate::routes::list::tests::response_from; @@ -273,7 +278,7 @@ mod tests { ) .await; - change_attribute_entries(&mut entries, 0, json!({"attribute_id": "attribute_id"})).await; + change_attribute_entries(&mut entries, 0, json!({"attribute_id": "attribute_id"})); assert_model_contains(&[object], &entries.objects, 0..1); assert_model_contains(&[s3_object], &entries.s3_objects, 0..1); @@ -317,7 +322,7 @@ mod tests { assert_eq!(s3_object_status_code, StatusCode::NOT_FOUND); // Nothing is expected to change. - change_attribute_entries(&mut entries, 0, json!({"attribute_id": "1"})).await; + change_attribute_entries(&mut entries, 0, json!({"attribute_id": "1"})); assert_correct_records(state.client(), entries).await; } @@ -361,8 +366,8 @@ mod tests { ) .await; - change_attribute_entries(&mut entries, 0, json!({"attribute_id": "attribute_id"})).await; - change_attribute_entries(&mut entries, 1, json!({"attribute_id": "attribute_id"})).await; + change_attribute_entries(&mut entries, 0, json!({"attribute_id": "attribute_id"})); + change_attribute_entries(&mut entries, 1, json!({"attribute_id": "attribute_id"})); assert_model_contains(&objects, &entries.objects, 0..2); assert_model_contains(&s3_objects, &entries.s3_objects, 0..2); @@ -454,8 +459,169 @@ mod tests { assert!(objects.is_empty()); assert!(s3_objects.is_empty()); - change_attribute_entries(&mut entries, 0, json!({"attribute_id": "2"})).await; - change_attribute_entries(&mut entries, 1, json!({"attribute_id": "2"})).await; + change_attribute_entries(&mut entries, 0, json!({"attribute_id": "2"})); + change_attribute_entries(&mut entries, 1, json!({"attribute_id": "2"})); + assert_correct_records(state.client(), entries).await; + } + + #[sqlx::test(migrator = "MIGRATOR")] + async fn update_attributes_wildcard_like(pool: PgPool) { + let state = AppState::from_pool(pool); + let mut entries = EntriesBuilder::default().build(state.client()).await; + + change_many( + state.client(), + &entries, + &[0, 1], + Some(json!({"attribute_id": "attribute_id"})), + ) + .await; + + let patch = json!({"attributes": [ + { "op": "add", "path": "/another_attribute", "value": "1" }, + ]}); + + entries_many( + &mut entries, + &[0, 1], + json!({"attribute_id": "attribute_id", "another_attribute": "1"}), + ); + + let (_, s3_objects) = response_from::>( + state.clone(), + "/s3_objects?attributes[attribute_id]=%a%", + Method::PATCH, + Body::new(patch.to_string()), + ) + .await; + let (_, objects) = response_from::>( + state.clone(), + "/objects?attributes[attribute_id]=%a%", + Method::PATCH, + Body::new(patch.to_string()), + ) + .await; + + assert_contains(&objects, &s3_objects, &entries, 0..2); + assert_correct_records(state.client(), entries).await; + } + + #[sqlx::test(migrator = "MIGRATOR")] + async fn update_attributes_wildcard_ilike(pool: PgPool) { + let state = AppState::from_pool(pool); + let mut entries = EntriesBuilder::default().build(state.client()).await; + + change_many( + state.client(), + &entries, + &[0, 1], + Some(json!({"attribute_id": "attribute_id"})), + ) + .await; + + let patch = json!({"attributes": [ + { "op": "add", "path": "/another_attribute", "value": "1" }, + ]}); + + entries_many( + &mut entries, + &[0, 1], + json!({"attribute_id": "attribute_id", "another_attribute": "1"}), + ); + + let (_, s3_objects) = response_from::>( + state.clone(), + "/s3_objects?attributes[attribute_id]=%A%", + Method::PATCH, + Body::new(patch.to_string()), + ) + .await; + let (_, objects) = response_from::>( + state.clone(), + "/objects?attributes[attribute_id]=%A%", + Method::PATCH, + Body::new(patch.to_string()), + ) + .await; + + assert!(s3_objects.is_empty()); + assert!(objects.is_empty()); + + let (_, s3_objects) = response_from::>( + state.clone(), + "/s3_objects?attributes[attribute_id]=%A%&case_sensitive=false", + Method::PATCH, + Body::new(patch.to_string()), + ) + .await; + let (_, objects) = response_from::>( + state.clone(), + "/objects?attributes[attribute_id]=%A%&case_sensitive=false", + Method::PATCH, + Body::new(patch.to_string()), + ) + .await; + + assert_contains(&objects, &s3_objects, &entries, 0..2); + assert_correct_records(state.client(), entries).await; + } + + #[sqlx::test(migrator = "MIGRATOR")] + async fn update_attributes_api_wildcard_like(pool: PgPool) { + let state = AppState::from_pool(pool); + let mut entries = EntriesBuilder::default().build(state.client()).await; + + change_many( + state.client(), + &entries, + &[0, 2, 4, 6, 8], + Some(json!({"attribute_id": "1"})), + ) + .await; + + let patch = json!({"attributes": [ + { "op": "add", "path": "/another_attribute", "value": "1" }, + ]}); + + let (_, s3_objects) = response_from::>( + state.clone(), + "/s3_objects?event_type=C__%", + Method::PATCH, + Body::new(patch.to_string()), + ) + .await; + + assert_wildcard_update(&mut entries, &s3_objects); + assert_correct_records(state.client(), entries).await; + } + + #[sqlx::test(migrator = "MIGRATOR")] + async fn update_attributes_api_wildcard_ilike(pool: PgPool) { + let state = AppState::from_pool(pool); + let mut entries = EntriesBuilder::default().build(state.client()).await; + + change_many( + state.client(), + &entries, + &[0, 2, 4, 6, 8], + Some(json!({"attribute_id": "1"})), + ) + .await; + + let patch = json!({"attributes": [ + { "op": "add", "path": "/another_attribute", "value": "1" }, + ]}); + + let (_, s3_objects) = response_from::>( + state.clone(), + // Percent-encoding should work too. + "/s3_objects?case_sensitive=false&event_type=c%25_%25d", + Method::PATCH, + Body::new(patch.to_string()), + ) + .await; + + assert_wildcard_update(&mut entries, &s3_objects); assert_correct_records(state.client(), entries).await; } }