index.md 26 KB

API

The Distributor Node, depending on the configuration, can expose either one or two HTTP APIs, both implemented with ExpressJS.

Public API

Public API is enabled by default and can be used to retrieve assets from the node as well as some basic information about its current status.

Public API is described by an OpenAPI schema located at src/api-spec/public.yml

Full public API documentation can be found here

Operator API

Secured operator API can be enabled with config.operatorApi. Operator API makes it possible to remotely execute some operations on a running node (ie. changing supported buckets).

Operator API is described by an OpenAPI schema located at src/api-spec/operator.yml

Full operator API documentation can be found here

Requesting assets

The assets are requested from the distributor node by using a GET request to /assets/{objectId} endpoint.

There are multiple scenarios of how a distributor will act upon that request, depending on its current state:

Scenario 1 (cache hit)

The requested data object is already available in the distributor node's filesystem (cache)

In this case:

  • Object's LRU-SP cache state is updated
  • The send library is used to handle the request and serve the object. The library supports, among others, partial responses (Ranges) and conditional-GET negotiation (If-Match, If-Unmodified-Since, If-None-Match, If-Modified-Since).
  • cache-control: max-age is set to 31536000 (one year), which is a common practice for informing the browser that the object can essentially be cached "forever" (minimizing the number of request for the same data object)
  • x-cache: hit and x-data-source: local headers are sent, providing the client detailed information about the triggered scenario (see: public.assets Responses).

Scenario 2 (pending)

The object is not yet cached, but is currently being fetched from the storage node

In this case cache-control: max-age is set to a substantially lower value (currently 180), as the distributor node cannot yet confirm whether the object being fetched is indeed valid.

Scenario 2.1: No Range header was provided with a request or the Range start is <= partiallyDownloadedContentSize

In this case:

  • The data is streamed into the response from the local, partially downloaded file. All the data that gets written into the local file, as it's being downloaded from the storage node, is beeing simultaneously read from the file (using a small interval) and immediately pushed into the http response.
  • x-cache: pending and x-data-source: local headers are sent, providing the client detailed information about the triggered scenario (see: public.assets Responses).

Scenario 2.2: Range header was provided with a request and Range start is > partiallyDownloadedContentSize

In this case streaming the response from partially downloaded file, like in the scenario above, may cause unnecessary delay, because the requested Range may target the very end of the file (which will only be available locally once the entire data object is fetched). That's why in this case:

  • The request is forwarded to the storage node (that the data object is currently being downloaded from) via express-http-proxy
  • x-cache: pending and x-data-source: external headers are sent, providing the client detailed information about the triggered scenario (see: public.assets Responses).

Scenario 3 (cache miss)

In this case the distributor node is making an additional request to the query node in order to fetch details of the requested object, including:

  • content hash,
  • object size,
  • storage buckets assigned to store the object,
  • distribution buckets assigned to distribute the object

It then proceeds to one of the following scenarios:

Scenario 3.1: The requested data object is not found

Node responds with HTTP 404 (Not Found) and a message

Scenario 3.2: The object is not distributed by the node

Node responds with HTTP 421 (Misdirected Request) and a message

Scenario 3.3: The request is valid, the node needs to fetch the missing object

In this case

  • The process of fetching the data object from storage node described in the Data fetching section below is triggered.
  • Once the storage node from which the object is going to be fetched is chosen, the request is handled in a way analogous to the one described in Scenario 2, with the exception that x-cache: miss header will be sent instead of x-cache: pending.

Checking asset status

It is possible to check an asset status without affecting the distributor node state in any way (for example - by triggering the process of fetching the missing data object), by sending a HEAD request to /assets/{objectId} endpoint.

If the request is valid, the node will respond with, among others, the x-cache, content-length, cache-control headers.

In case the request is not invalid, the node will respond with the same status code it would in case of an invalid GET request.

API limits

There are no rate / connection limits on incoming requests enforced by the node, it is therefore recommended to use a firewall or reverse proxy in order to protect the node from DOS/DDOS attacks.

The outbound connections (from distributor node to storage nodes) however can be limited with limits configuration settings.

Example Nginx configuration

upstream distributor {
    server 127.0.0.1:3334;
}

http {
  # Create a conn_perip zone that will keep track of concurrent connections by ip
  limit_conn_zone $binary_remote_addr zone=conn_perip:10m;

  server {
    server_name example-distributor-node;
    listen 443;

    # Limit to max 20 connections per ip at a time
    limit_conn addr 20;

    location / {
      proxy_pass http://distributor/;
      proxy_http_version 1.1;
      proxy_set_header Upgrade $http_upgrade;
      proxy_set_header Connection "upgrade";
      proxy_set_header Host $http_host;

      proxy_set_header X-Real-IP $remote_addr;
      proxy_set_header X-Forward-For $proxy_add_x_forwarded_for;
      proxy_set_header X-Forward-Proto http;
      proxy_set_header X-Nginx-Proxy true;

        proxy_redirect off;
    }

    # SSL and other configuration...
  }
}

Because Nginx does not support HTTP pipelining, by limiting the number of concurrent connections per ip we also limit the number of data objects that can be concurrently fetched from the distributor node by a single IP.

Having in mind that most browsers will not make more than 6 concurrent connections, the limit of 20 concurrent connections per ip should be more than sufficient.

System configuration

When configuring the limits, keep in mind that a lot of simultaneous connections may also cause some OS limits to be hit.

For example, the default limit of file descriptors a single process can open on Linux systems is 1024. If left unchanged, this limit can easily cause problems, as this means only 1024 connections can be handled concurrently. In reality this number will be much lower for distributor node, because:

  • Each connection will require 1 file descriptor for a socket
  • Each incoming connection will most likely require an asset (data object) file to be accessed, which will take another descriptor,
  • Each incoming connection may trigger many outbound connections (see Data fetching section below) in case of cache miss, in worst case taking over 10 more descriptors

For Linux users it is recommended to either run the distributor node using the docker image, which already has high limits set, or modify the max open file descriptors limit manually

Data fetching

Finding nearby storage nodes:

In order to limit the number of requests being made on cache miss and the time it takes to fetch a new object in this scenario, the distributor node needs to keep track of how quickly (on average) the currently available storage nodes are responding to requests.

This can be partially solved by making use of the on-chain metadata provided by storage node operators, which may include details about the node location (see Metadata section) that can provide some estimation of which nodes will likely respond faster. However, because this approach is quite limited and it's possible that most storage providers will choose not to expose their node location, the distributor node instead uses a different approach to find nearby nodes.

Currently the distributor node periodically (every intervals.checkStorageNodeResponseTimes seconds) fetches all active storage provider endpoints (from the query node) and measures their average response times to /status/version requests. This is done independently of any incoming requests. The "response time check" requests are queued using relatively small concurrency limit (10) in order to make the cost of this operation minimal.

This provides a pretty good estimation on which nodes will likely be the best candidates for fetching data objects during a cache miss, it also allows filtering-out storage nodes that don't respond at all or respond with an error.

Data object fetching flow

During the cache miss scenario (Scenario 3.3), the following tasks are executed:

First, the endpoints of all storage providers that are supposed to store the given object are ordered by the mean response time using 10 last response times (the process of obtaining those measurements is described in the previous section)

The HEAD /files/{objectId} requests are then sent to the storage endpoints, starting from the ones with lowest mean response time. Those initial requests are only meant to determine whether a given storage node can indeed serve the object. In fact, all those requests are put (in the specified order) in the availabilityCheckQueue which then executes them with a constant maximum concurrency (10 at the time of writing).

As soon as any storage node confirms the availability of the object, the availabilityCheckQueue is temporarily stopped and GET /files/{objectId} request is made to fetch the full data from the selected provider. Because the distributor node uses Connection: keep-alive headers when sending requests to storage nodes, there's no need to re-establish a TCP connection at this point, which can save a considerable amount of time. If other storage providers confirm the availability of the object during this time, other GET requests will be added to objectDownloadQueue (which uses a concurrency of 1), allowing the distributor node to instantly try a different provider in case the first GET request fails. The process continues until a storage node that responds with HTTP 200 to a GET request is found.

Once some storage node responds with HTTP 200 and starts streaming the data, all other requests related to that data object are stopped and the distributor node begins to write the data into its filesystem. Any errors at this point (unexpected data size, stream errors) will mean that the fetching process has failed, causing the data object and any related state to be dropped and the whole process of fetching the object to potentially be re-tried upon another request.

Metadata

The documentation of current storage&distribution system on-chain metadata standard can be found here

Distributor node metadata can be set using operator:set-metadata command in Distributor Node CLI.

Distribution family metadata can be set using leader:set-bucket-family-metadata

Once set, the metadata can be accessed from the Query Node with a GraphQL query like, for example:

query {
  distributionBuckets {
    family {
      metadata {
        region
        description
        latencyTestTargets
        areas {
          area {
            __typename
            ...on GeographicalAreaCountry {
              countryCode: code
            }
            ...on GeographicalAreaContinent {
              continentCode: code
            }
            ...on GeographicalAreaSubdivistion {
              subdivisionCode: code
            }
          }
        }
      }
    }
    operators {
      metadata {
        nodeEndpoint
        nodeLocation {
          countryCode
          coordinates {
            latitude
            longitude
          }
        }
        extra
      }
    }
  }
}

DistributionBucketFamilyMetadata

The main purpose of distribution family metadata is to help client (frontend) applications find out which distribution nodes should be preferred when fetching assets.

Although each node operator may choose to expose its own node location in the DistributionBucketOperatorMetadata, it is generally assumed that all nodes belonging to a given family will have a good-enough latency in the region covered by this family, so they can be treated more-or-less equally.

What exactly constitutes a region in the DistributionBucketFamilyMetadata is not strictly enforced and the current metadata standard remains quite flexible in that regard.

Geographical areas covered by the distirbution bucket family

Initially, as the number of distribution nodes will probably be limited, a region can mean a relatively large geographic area (ie. a continent or part of a continent). Later, as the network grows, the region may mean a single country / subdivision or a small set of nearby countries / subdivisions.

In order to support all those cases, the areas field in the DistributionBucketFamilyMetadata allows specifying either one or multiple geographical areas covered by the family, where each area can be either:

  • a continent uniquely identified by Continent enum value, or
  • a country uniquely identified by ISO-3166-1 alpha-2 country code, or
  • a subdivision (for example, a state) uniquely identified by ISO_3166-2 subdivision code

There are multiple ways client applications may be able to determine most suitable regions:

  • Using HTML5 geolocation API and reverse geocoding (which can be done either using a local dataset, custom backend or an external service)
  • using GeoDNS or a backend service to establish the approximate location before rendering the interface
  • Prompting the user to manually provide the preferred region

Using latency tests for choosing a family

Another way to choose the most appropriate region may be to perform an initial latency check by pinging endpoints that are supposed to give the most representative results for given family (for example, https://www.cloudping.info/ can perform such measurements using endpoints that represent AWS regions).

In order to facilitate this, latency_test_targets field in the DistributionBucketFamilyMetadata allows specifying the list of representative ips / hosts to be used for such measurements. Alternatively a chosen set of distribution nodes themselves may also be used.

Distribution bucket operator metadata

The most essential part of DistributionBucketOperatorMetadata is the node API root endpoint, it must be provided by all active node operators, otherwise no app will be able to access the node.

The node operator may optionally choose to expose more details about the node, like specific location metadata or some additional extra information.

State

The distributor node state is divided into memory state (recreated on startup) and persistent state (stored in filesystem).

Most of the state is managed via via an "intermediary" service called StateCacheService. This is to facilitate the potential migration to other state management approaches, like using Redis in the future. Currently the service automatically saves the persistent state to the filesystem every intervals.saveCacheState seconds. It also tries to save the state every time the node is exiting.

The current state includes:

Memory state

  • pendingDownloadsByObjectId map - stores information about currently pending downloads (data object fetching attempts). Each pending download can be in one of the following states:
    • Waiting - in case limits.maxConcurrentStorageNodeDownloads limit is reached, this is the status of pending downloads that are waiting in the queue. It is also the initial status of all pending downloads in general.
    • LookingForSource - the process of looking for a storage node that is able to serve the asset has started, but the source node has not yet been chosen.
    • Downloading - the source storage node has been chosen and the data object is being downloaded.
  • storageNodeEndpointDataByEndpoint map - currently stores the last 10 average mean response times mapped by storage nodes endpoints (see: Finding nearby storage nodes)
  • groupNumberByObjectId map - stores the LRU-SP cache group number (see: Caching policy) of each cached data object.

Persistent state

  • lruCacheGroups - list of LRU-SP cache groups. Each LRU group contains a map of cached data object details (size, popularity, last access time) required to to calculate its cost parameter (see: Caching policy)
  • mimeTypeByObjectId map - stores the mimeType (as determined by the distributor node) of each cached data object

Caching

Caching policy

The caching policy used for the data objects stored by the distributor node is called LRU-SP.

This caching policy was designed specifically for the web and it takes into account the following 3 properties of a data object:

  • object size (s)
  • object popularity (number of times it was requested while being cached) (p)
  • time elapsed since the object was last requested (t)

The cost function of a cache item is described by the formula: t * s / p. Objects with highest cost are the first to be evicted in case limits.storage limit is reached.

LRU groups

For efficiency, the cache is divided into LRU (Least recently used) sets (groups) such that all objects in a given group share the same integer value of log2(s / p). In the current distributor node implementation, the unit used for s (object size) is KB (kilobytes). This means that if we have 24 LRU groups and assume p = 1 (popularity = 1) for all objects, first LRU group will contain objects of size 1 - 2 KB, second one 2 - 4 KB etc. up until 24-th group with objects of size 8 - 16 GB (or 2^23 KB - 2^24 KB).

When the object is being requested, we're incrementing its p and checking the current value of log2(s / p). Then we're calling SetA.delete(object) + SetB.add(object) (either moving the item to a different LRU set based on current log2(s / p), in which case SetA !== SetB, or just moving the item to the "top" of the current set, in which case SetA === SetB). All of those operations are very quick and don't require any costly iterations.

In order to find the best eviction candidate, we're taking the "bottom" item from each LRU set and then choose an element with lowest t * s / p (which is also a low-cost operation, considering we need only ~24 LRU groups)

Cache cleanup

No-longer-distributed data objects are dropped from the cache periodically every intervals.cacheCleanup seconds. During this time the distributor node will fetch all its current on-chain obligations using the query node and drop any objects that are part of the cache but not part of the obligations from both the cache state and filesystem.

Logging

The distributor node supports detailed logging with winston library. NPM log levels are used to specify the log priority.

The logs can be directed to some of the 3 available outputs, depending on the logs configuration settings:

Query node integration

Because the distributor node is making requests to a query node:

In order to achieve the best perfomance it is recommended to either run the query-node processor and graphql server on the same machine the distributor node will be running on, or use a query node endpoint that can be accessed with a minimal latency.

Taking the docker-compose.yml example, the services that could be run on the same machine may include:

  • db
  • processor
  • graphql-server
  • distributor-node

The INDEXER_ENDPOINT_URL can be point to a completely external indexer endpoint, as the latency between processor and indexer is less of an issue in this case.