Skip to content

Commit

Permalink
fixed file upload
Browse files Browse the repository at this point in the history
  • Loading branch information
diptanu committed Sep 5, 2024
1 parent f9d1706 commit 8e4277f
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 60 deletions.
6 changes: 2 additions & 4 deletions server-next/blob_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ use bytes::{Bytes, BytesMut};
use futures::{stream::BoxStream, StreamExt};
use object_store::{
aws::{AmazonS3, AmazonS3Builder},
local,
ObjectStore,
WriteMultipart,
local, ObjectStore, WriteMultipart,
};
use serde::{Deserialize, Serialize};
use tokio::io::AsyncWrite;
Expand Down Expand Up @@ -151,7 +149,7 @@ impl BlobStorage {
)
}
}

pub fn get(&self, key: &str) -> BlobStorageReaderTS {
if key.starts_with("s3://") {
let (bucket, key) = parse_s3_url(key)
Expand Down
6 changes: 1 addition & 5 deletions server-next/blob_store/src/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,7 @@ use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;

use super::{
BlobStorageConfig,
BlobStoragePartWriter,
BlobStorageReader,
BlobStorageWriter,
PutResult,
BlobStorageConfig, BlobStoragePartWriter, BlobStorageReader, BlobStorageWriter, PutResult,
StoragePartWriter,
};

Expand Down
11 changes: 5 additions & 6 deletions server-next/src/http_objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,15 +266,14 @@ pub struct GraphInvocations {

#[derive(Debug, Serialize, Deserialize)]
pub struct GraphInput {
// file:///s3://bucket/key
// file:///data/path/to/file
pub payload: String,
pub labels: HashMap<String, String>,
pub input: serde_json::Value,
pub payload: serde_json::Value,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct IndexifyFile {
// file:///s3://bucket/key
// file:///data/path/to/file
pub url: String,
pub hash: String,
pub metadata: serde_json::Value,
pub sha_256: String,
}
140 changes: 95 additions & 45 deletions server-next/src/routes.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
use anyhow::{anyhow, Result};
use data_model::DataObjectBuilder;
use uuid::Uuid;
use sha2::Sha256;
use sha2::Digest;
use axum::{
extract::{Multipart, Path, State},
response::IntoResponse,
routing::{delete, get, post},
Json, Router,
};
use futures::StreamExt;
use data_model::DataObjectBuilder;
use futures::{stream, StreamExt};
use nanoid::nanoid;
use tracing::info;
use sha2::Digest;
use sha2::Sha256;
use std::sync::Arc;
use tracing::info;
use uuid::Uuid;

use state_store::{
requests::{
CreateComputeGraphRequest, DeleteComputeGraphRequest, InvokeComputeGraphRequest, NamespaceRequest, RequestType
CreateComputeGraphRequest, DeleteComputeGraphRequest, InvokeComputeGraphRequest,
NamespaceRequest, RequestType,
},
IndexifyState,
};
Expand All @@ -25,7 +26,7 @@ use utoipa_swagger_ui::SwaggerUi;

use crate::http_objects::{
ComputeFn, ComputeGraph, ComputeGraphsList, CreateNamespace, DataObject, DynamicRouter,
GraphInvocations, IndexifyAPIError, Namespace, NamespaceList, Node,
GraphInvocations, IndexifyAPIError, IndexifyFile, Namespace, NamespaceList, Node,
};

#[derive(OpenApi)]
Expand All @@ -41,7 +42,7 @@ use crate::http_objects::{
),
components(
schemas(
CreateNamespace,
CreateNamespace,
NamespaceList,
IndexifyAPIError,
Namespace,
Expand Down Expand Up @@ -198,7 +199,11 @@ async fn create_compute_graph(

let file_name = format!("{}_{}", namespace, nanoid!());

let put_result = state.blob_storage.put(&file_name, stream).await.map_err(|e| IndexifyAPIError::internal_error(e))?;
let put_result = state
.blob_storage
.put(&file_name, stream)
.await
.map_err(|e| IndexifyAPIError::internal_error(e))?;
code_url = Some(put_result.url);
} else if name == "compute_graph" {
let text = field
Expand Down Expand Up @@ -363,44 +368,89 @@ async fn upload_data(
State(state): State<RouteState>,
mut files: Multipart,
) -> Result<(), IndexifyAPIError> {
while let Some(field) = files.next_field().await.unwrap() {
if let Some(_) = field.file_name() {
let name = Uuid::new_v4().to_string();
info!("writing to blob store, file name = {:?}", name);
let stream = field.map(|res| res.map_err(|err| anyhow::anyhow!(err)));
let mut hasher = Sha256::new();
let hashed_stream = stream.map(|item| {
item.map(|bytes| {
hasher.update(&bytes);
bytes
})
});
let res = state.blob_storage.put(&name, hashed_stream).await.map_err(|e| {
IndexifyAPIError::internal_error(anyhow!("failed to write to blob store: {}", e))
})?;
let mut metadata: Option<serde_json::Value> = None;
let mut url: Option<String> = None;
let mut hash: Option<String> = None;

//let hash= format!("{:x}", hasher.finalize());

let data_object = DataObjectBuilder::default()
.namespace(namespace.clone())
.compute_graph_name(compute_graph.clone())
.compute_fn_name("".to_string())
.payload_url(res.url)
.payload_hash(hasher.finalize().into())
.build()
.map_err(|e| {
IndexifyAPIError::internal_error(anyhow!("failed to upload content: {}", e))
})?;

state.indexify_state.write(RequestType::InvokeComputeGraph(InvokeComputeGraphRequest{
namespace: namespace.clone(),
compute_graph_name: compute_graph.clone(),
data_object
})).await.map_err(|e| {
IndexifyAPIError::internal_error(anyhow!("failed to upload content: {}", e))
})?;
while let Some(field) = files.next_field().await.unwrap() {
if let Some(name) = field.name() {
if name == "file" {
let name = Uuid::new_v4().to_string();
info!("writing to blob store, file name = {:?}", name);
let stream = field.map(|res| res.map_err(|err| anyhow::anyhow!(err)));
let mut hasher = Sha256::new();
let hashed_stream = stream.map(|item| {
item.map(|bytes| {
hasher.update(&bytes);
bytes
})
});
let res = state
.blob_storage
.put(&name, hashed_stream)
.await
.map_err(|e| {
IndexifyAPIError::internal_error(anyhow!(
"failed to write to blob store: {}",
e
))
})?;
url = Some(res.url);
hash = Some(format!("{:x}", hasher.finalize()));
} else if name == "metadata" {
let text = field
.text()
.await
.map_err(|e| IndexifyAPIError::bad_request(&e.to_string()))?;
let file_metadata = serde_json::from_str(&text)?;
metadata = Some(file_metadata);
}
}
}
if url.is_none() {
return Err(IndexifyAPIError::bad_request("file is required"));
}
let payload = IndexifyFile {
metadata: metadata.unwrap_or_default(),
url: url.unwrap(),
sha_256: hash.unwrap(),
};
let payload_json = serde_json::to_string(&payload)?;
let payload_hash = Sha256::digest(payload_json.as_bytes());
let payload_key = Uuid::new_v4().to_string();
let payload_stream = stream::once(async move {
let payload_json = payload_json.as_bytes().to_vec().clone();
Ok(payload_json.into())
});
let put_result = state
.blob_storage
.put(&payload_key, Box::pin(payload_stream))
.await
.map_err(|e| {
IndexifyAPIError::internal_error(anyhow!("failed to upload content: {}", e))
})?;
let data_object = DataObjectBuilder::default()
.namespace(namespace.clone())
.compute_graph_name(compute_graph.clone())
.compute_fn_name("".to_string())
.payload_url(put_result.url)
.payload_hash(payload_hash.into())
.build()
.map_err(|e| {
IndexifyAPIError::internal_error(anyhow!("failed to upload content: {}", e))
})?;

state
.indexify_state
.write(RequestType::InvokeComputeGraph(InvokeComputeGraphRequest {
namespace: namespace.clone(),
compute_graph_name: compute_graph.clone(),
data_object,
}))
.await
.map_err(|e| {
IndexifyAPIError::internal_error(anyhow!("failed to upload content: {}", e))
})?;
Ok(())
}

Expand Down

0 comments on commit 8e4277f

Please sign in to comment.