123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408 |
- 'use strict'
- const { Transform } = require('stream')
- const fs = require('fs')
- const debug = require('debug')('joystream:storage:storage')
- const Promise = require('bluebird')
- Promise.config({
- cancellation: true,
- })
- const fileType = require('file-type')
- const ipfsClient = require('ipfs-http-client')
- const temp = require('temp').track()
- const _ = require('lodash')
- const DEFAULT_TIMEOUT = 30 * 1000
- const DEFAULT_RESOLVE_CONTENT_ID = async (original) => {
- debug('Warning: Default resolution returns original CID', original)
- return original
- }
- const DEFAULT_FILE_INFO = {
- mimeType: 'application/octet-stream',
- ext: 'bin',
- }
- function fixFileInfo(info) {
- if (!info) {
- info = DEFAULT_FILE_INFO
- } else {
- info.mimeType = info.mime
- delete info.mime
- }
- return info
- }
- function fixFileInfoOnStream(stream) {
- const info = fixFileInfo(stream.fileType)
- delete stream.fileType
- stream.fileInfo = info
- return stream
- }
- class StorageWriteStream extends Transform {
- constructor(storage, options) {
- options = _.clone(options || {})
- super(options)
- this.storage = storage
-
- this.temp = temp.createWriteStream()
- this.buf = Buffer.alloc(0)
- }
- _transform(chunk, encoding, callback) {
-
- if (typeof chunk === 'string') {
- chunk = Buffer.from(chunk)
- }
-
-
- this.temp.write(chunk)
-
- if (!this.fileInfo && this.buf < fileType.minimumBytes) {
- this.buf = Buffer.concat([this.buf, chunk])
- if (this.buf >= fileType.minimumBytes) {
- const info = fileType(this.buf)
-
- if (info) {
- this.fileInfo = fixFileInfo(info)
- this.emit('fileInfo', this.fileInfo)
- }
- }
- }
- callback(null)
- }
- _flush(callback) {
- debug('Flushing temporary stream:', this.temp.path)
- this.temp.end()
-
- if (!this.fileInfo) {
- const read = fs.createReadStream(this.temp.path)
- fileType
- .stream(read)
- .then((stream) => {
- this.fileInfo = fixFileInfoOnStream(stream).fileInfo
- this.emit('fileInfo', this.fileInfo)
- })
- .catch((err) => {
- debug('Error trying to detect file type at end-of-stream:', err)
- })
- }
- callback(null)
- }
-
- commit() {
-
- if (!this.temp) {
- throw new Error('Cannot commit a temporary stream that does not exist. Did you call cleanup()?')
- }
- debug('Committing temporary stream: ', this.temp.path)
- this.storage.ipfs
- .addFromFs(this.temp.path)
- .then(async (result) => {
- const hash = result[0].hash
- debug('Stream committed as', hash)
- this.emit('committed', hash)
- await this.storage.ipfs.pin.add(hash)
- })
- .catch((err) => {
- debug('Error committing stream', err)
- this.emit('error', err)
- })
- }
-
- cleanup() {
- debug('Cleaning up temporary file: ', this.temp.path)
- fs.unlink(this.temp.path, () => {
-
- })
- delete this.temp
- }
- }
- class Storage {
-
- static create(options) {
- const storage = new Storage()
- storage._init(options)
- return storage
- }
- _init(options) {
- this.options = _.clone(options || {})
- this.options.ipfs = this.options.ipfs || {}
- this._timeout = this.options.timeout || DEFAULT_TIMEOUT
- this._resolve_content_id = this.options.resolve_content_id || DEFAULT_RESOLVE_CONTENT_ID
- this.ipfs = ipfsClient(this.options.ipfs.connect_options)
- this.pinned = {}
- this.pinning = {}
- this.ipfs.id((err, identity) => {
- if (err) {
- debug(`Warning IPFS daemon not running: ${err.message}`)
- } else {
- debug(`IPFS node is up with identity: ${identity.id}`)
-
-
-
- this.ipfs.config.set('Gateway.PublicGateways', { 'localhost': null })
- }
- })
- }
-
- async withSpecifiedTimeout(timeout, operation) {
-
-
- return new Promise(async (resolve, reject) => {
- try {
- resolve(await new Promise(operation))
- } catch (err) {
- reject(err)
- }
- }).timeout(timeout || this._timeout)
- }
-
- async resolveContentIdWithTimeout(timeout, contentId) {
- return await this.withSpecifiedTimeout(timeout, async (resolve, reject) => {
- try {
- resolve(await this._resolve_content_id(contentId))
- } catch (err) {
- reject(err)
- }
- })
- }
-
- async stat(contentId, timeout) {
- const resolved = await this.resolveContentIdWithTimeout(timeout, contentId)
- return await this.withSpecifiedTimeout(timeout, (resolve, reject) => {
- this.ipfs.files.stat(`/ipfs/${resolved}`, { withLocal: true }, (err, res) => {
- if (err) {
- reject(err)
- return
- }
- resolve(res)
- })
- })
- }
-
- async size(contentId, timeout) {
- const stat = await this.stat(contentId, timeout)
- return stat.size
- }
-
- async open(contentId, mode, timeout) {
- if (mode !== 'r' && mode !== 'w') {
- throw Error('The only supported modes are "r", "w" and "a".')
- }
-
- if (mode === 'w') {
- return await this.createWriteStream(contentId, timeout)
- }
-
- return await this.createReadStream(contentId, timeout)
- }
- async createWriteStream() {
-
-
-
-
- return new Promise((resolve) => {
- const stream = new StorageWriteStream(this)
- resolve(stream)
- })
- }
- async createReadStream(contentId, timeout) {
- const resolved = await this.resolveContentIdWithTimeout(timeout, contentId)
- let found = false
- return await this.withSpecifiedTimeout(timeout, (resolve, reject) => {
- const ls = this.ipfs.getReadableStream(resolved)
- ls.on('data', async (result) => {
- if (result.path === resolved) {
- found = true
- const ftStream = await fileType.stream(result.content)
- resolve(fixFileInfoOnStream(ftStream))
- }
- })
- ls.on('error', (err) => {
- ls.end()
- debug(err)
- reject(err)
- })
- ls.on('end', () => {
- if (!found) {
- const err = new Error('No matching content found for', contentId)
- debug(err)
- reject(err)
- }
- })
- ls.resume()
- })
- }
-
- async synchronize(contentId, callback) {
- const resolved = await this.resolveContentIdWithTimeout(this._timeout, contentId)
-
- if (!this.pinning[resolved] && !this.pinned[resolved]) {
- debug(`Pinning hash: ${resolved} content-id: ${contentId}`)
- this.pinning[resolved] = true
-
-
- this.ipfs.pin.add(resolved, { quiet: true, pin: true }, (err) => {
- delete this.pinning[resolved]
- if (err) {
- debug(`Error Pinning: ${resolved}`)
- callback && callback(err)
- } else {
- debug(`Pinned ${resolved}`)
- this.pinned[resolved] = true
- callback && callback(null, this.syncStatus(resolved))
- }
- })
- } else {
- callback && callback(null, this.syncStatus(resolved))
- }
- }
- syncStatus(ipfsHash) {
- return {
- syncing: this.pinning[ipfsHash] === true,
- synced: this.pinned[ipfsHash] === true,
- }
- }
- }
- module.exports = {
- Storage,
- }
|