Browse Source

storage-node: assets PUT early handling of content-length

Mokhtar Naamani 4 years ago
parent
commit
13a3b83157

+ 0 - 1
storage-node/packages/cli/src/commands/upload.ts

@@ -3,7 +3,6 @@ import fs from 'fs'
 import ipfsHash from 'ipfs-only-hash'
 import { ContentId, DataObject } from '@joystream/types/storage'
 import BN from 'bn.js'
-import { Option } from '@polkadot/types/codec'
 import { BaseCommand } from './base'
 import { DiscoveryClient } from '@joystream/service-discovery'
 import Debug from 'debug'

+ 1 - 0
storage-node/packages/colossus/package.json

@@ -62,6 +62,7 @@
     "express-openapi": "^4.6.1",
     "figlet": "^1.2.1",
     "http-proxy-middleware": "^1.0.5",
+    "ipfs-only-hash": "^1.0.2",
     "js-yaml": "^3.13.1",
     "lodash": "^4.17.11",
     "meow": "^7.0.1",

+ 29 - 15
storage-node/packages/colossus/paths/asset/v0/{id}.js

@@ -54,20 +54,9 @@ module.exports = function (storage, runtime, ipfsHttpGatewayUrl, anonymous) {
         return
       }
 
-      // TODO: Do early filter on content_length..(do not wait for fileInfo)
-      // ensure it equals the data object claimed size, and less than max allowed by
-      // node policy.
-
-      // get content_length from request
-      // const fileSizesEqual = dataObject.size_in_bytes.eq(content_length)
-      // if (!fileSizesEqual) {
-      //   return res.status(403).send({ message: 'Upload size does not match expected size of content' })
-      // }
-
       const id = req.params.id // content id
 
-      // First check if we're the liaison for the name, otherwise we can bail
-      // out already.
+      // Check if we're the liaison for the content
       const roleAddress = runtime.identities.key.address
       const providerId = runtime.storageProviderId
       let dataObject
@@ -78,6 +67,21 @@ module.exports = function (storage, runtime, ipfsHttpGatewayUrl, anonymous) {
         return
       }
 
+      // Early filtering on content_length..do not wait for fileInfo
+      // ensure its less than max allowed by node policy.
+      const filterResult = filter({}, req.headers)
+
+      if (filterResult.code !== 200) {
+        errorHandler(res, new Error(filterResult.message), filterResult.code)
+        return
+      }
+
+      // Ensure content_length from request equals size in data object.
+      if (!dataObject.size_in_bytes.eq(filterResult.content_length)) {
+        errorHandler(res, new Error('Content Length does not match expected size of content'), 403)
+        return
+      }
+
       const sufficientBalance = await runtime.providerHasMinimumBalance(PROCESS_UPLOAD_BALANCE)
 
       if (!sufficientBalance) {
@@ -102,13 +106,23 @@ module.exports = function (storage, runtime, ipfsHttpGatewayUrl, anonymous) {
           }
         }
 
-        // May occurs before entire stream is processed, at the end of stream
-        // or possibly never.
+        // May be emitted before entire stream is processed. If there was an error detecting the
+        // file info at end of stream info will be null.
         stream.on('fileInfo', async (info) => {
           try {
             debug('Detected file info:', info)
 
+            if (!info) {
+              // Do not process unknown content.
+              debug('Failed to detect content type!')
+              stream.end()
+              res.status(403).send({ message: 'Uknown content type' })
+              return
+            }
+
             // Filter allowed content types
+            // == We haven't computed ipfs hash yet so is it really fair to reject content?
+            // == It may not be the real uploader doing the upload!
             const filterResult = filter({}, req.headers, info.mimeType)
             if (filterResult.code !== 200) {
               debug('Rejecting content', filterResult.message)
@@ -129,6 +143,7 @@ module.exports = function (storage, runtime, ipfsHttpGatewayUrl, anonymous) {
           }
         })
 
+        // `finish` comes before `fileInfo` event if file info detection happened at end of stream.
         stream.on('finish', () => {
           try {
             finished = true
@@ -139,7 +154,6 @@ module.exports = function (storage, runtime, ipfsHttpGatewayUrl, anonymous) {
         })
 
         stream.on('committed', async (hash) => {
-          console.log('commited', dataObject)
           try {
             if (hash !== dataObject.ipfs_content_id.toString()) {
               debug('Rejecting content. IPFS hash does not match value in objectId')

+ 17 - 2
storage-node/packages/storage/filter.js

@@ -94,13 +94,15 @@ function mimeMatchesAny(accept, reject, provided) {
 function filterFunc(config, headers, mimeType) {
   const filter = configDefaults(config)
 
+  const size = contentLengthFromHeaders(headers)
+
   // Enforce maximum file upload size
   if (filter.max_size) {
-    const size = parseInt(headers['content-length'], 10)
     if (!size) {
       return {
         code: 411,
         message: 'A Content-Length header is required.',
+        content_length: size,
       }
     }
 
@@ -108,21 +110,34 @@ function filterFunc(config, headers, mimeType) {
       return {
         code: 413,
         message: 'The provided Content-Length is too large.',
+        content_length: size,
       }
     }
   }
 
   // Enforce mime type based filtering
-  if (!mimeMatchesAny(filter.mime.accept, filter.mime.reject, mimeType)) {
+  if (mimeType && !mimeMatchesAny(filter.mime.accept, filter.mime.reject, mimeType)) {
     return {
       code: 415,
       message: 'Content has an unacceptable MIME type.',
+      content_length: size,
     }
   }
 
   return {
     code: 200,
+    content_length: size,
+  }
+}
+
+function contentLengthFromHeaders(headers) {
+  const content_length = headers['content-length']
+
+  if (!content_length) {
+    return null
   }
+
+  return parseInt(content_length, 10)
 }
 
 module.exports = filterFunc

+ 5 - 2
storage-node/packages/storage/storage.js

@@ -107,7 +107,7 @@ class StorageWriteStream extends Transform {
 
       if (this.buf >= fileType.minimumBytes) {
         const info = fileType(this.buf)
-        // No info? We can try again at the end of the stream.
+        // No info? We will try again at the end of the stream.
         if (info) {
           this.fileInfo = fixFileInfo(info)
           this.emit('fileInfo', this.fileInfo)
@@ -122,7 +122,10 @@ class StorageWriteStream extends Transform {
     debug('Flushing temporary stream:', this.temp.path)
     this.temp.end()
 
+    // TODO: compute ipfs hash and include it in emitted event fileInfo
+
     // Since we're finished, we can try to detect the file type again.
+    // If we don't find type we should still emit with some indication of detection error.
     if (!this.fileInfo) {
       const read = fs.createReadStream(this.temp.path)
       fileType
@@ -133,6 +136,7 @@ class StorageWriteStream extends Transform {
         })
         .catch((err) => {
           debug('Error trying to detect file type at end-of-stream:', err)
+          this.emit('fileInfo', null)
         })
     }
 
@@ -143,7 +147,6 @@ class StorageWriteStream extends Transform {
    * Commit this stream to the IPFS backend.
    */
   commit() {
-    // Create a read stream from the temp file.
     if (!this.temp) {
       throw new Error('Cannot commit a temporary stream that does not exist. Did you call cleanup()?')
     }