Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add HTTP enabled bitswap impl #8

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
489 changes: 489 additions & 0 deletions patches/@helia+interface+1.0.0.patch

Large diffs are not rendered by default.

4,359 changes: 4,359 additions & 0 deletions patches/helia+1.0.3.patch

Large diffs are not rendered by default.

128 changes: 128 additions & 0 deletions patches/mortice+3.0.1.patch

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion src/components/CidRenderer.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ export default function CidRenderer ({ requestPath }: { requestPath: string }):
return (
<div>
<ValidationMessage pathNamespacePrefix={pathNamespacePrefix} cid={cid} requestPath={requestPath}>
<button onClick={() => { void makeRequest(false) }} className='button-reset pv3 tc bn bg-animate bg-black-80 hover-bg-aqua white pointer w-100'>Load in-page</button>
<button onClick={() => { void makeRequest() }} className='button-reset pv3 tc bn bg-animate bg-black-80 hover-bg-aqua white pointer w-100'>Load in-page</button>

<a className="pt3 db" href={swPath} target="_blank">
<button className='button-reset pv3 tc bn bg-animate bg-black-80 hover-bg-aqua white pointer w-100'>Load directly / download</button>
Expand Down
44 changes: 42 additions & 2 deletions src/get-helia.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@ import type { Helia } from '@helia/interface'
import { MemoryBlockstore } from 'blockstore-core'
import { LevelDatastore } from 'datastore-level'
import { MemoryDatastore } from 'datastore-core'
// import { CID } from 'multiformats/cid'
import type { MultihashHasher } from 'multiformats/hashes/interface'
import { createBitswapWithHTTP } from 'ipfs-bitswap'
import { sha256, sha512 } from 'multiformats/hashes/sha2'
import { identity } from 'multiformats/hashes/identity'
import { multiaddr } from '@multiformats/multiaddr'
import { peerIdFromString } from '@libp2p/peer-id'

import type { LibP2pComponents, Libp2pConfigTypes } from './types.ts'
import { getLibp2p } from './getLibp2p.ts'
Expand All @@ -28,7 +33,7 @@ const defaultOptions: GetHeliaOptions = {

export async function getHelia ({ usePersistentDatastore, libp2pConfigType }: GetHeliaOptions = defaultOptions): Promise<Helia> {
// the blockstore is where we store the blocks that make up files
const blockstore: HeliaInit['blockstore'] = new MemoryBlockstore() as unknown as HeliaInit['blockstore']
const blockstore = new MemoryBlockstore()

// application-specific data lives in the datastore
let datastore: LibP2pComponents['datastore']
Expand All @@ -44,10 +49,45 @@ export async function getHelia ({ usePersistentDatastore, libp2pConfigType }: Ge
// libp2p is the networking layer that underpins Helia
const libp2p = await getLibp2p({ datastore, type: libp2pConfigType })

const hashers: MultihashHasher[] = [
sha256,
sha512,
identity
]

const httpBitswap = createBitswapWithHTTP(libp2p, blockstore, {
bootstrapHttpOnlyPeers: [
// 'https://cloudflare-ipfs.com',
'https://ipfs.io'
],
bitswapOptions: {
hashLoader: {
getHasher: async (codecOrName: string | number) => {
const hasher = hashers.find(hasher => {
return hasher.code === codecOrName || hasher.name === codecOrName
})

if (hasher != null) {
return await Promise.resolve(hasher)
}

throw new Error(`Could not load hasher for code/name "${codecOrName}"`)
}
}
}
})

// TODO remove this hardcoded bootstrap peer for the http over libp2p part
const marcoServer = multiaddr('/ip4/34.221.29.193/udp/4001/quic-v1/webtransport/certhash/uEiCuO-L9hgcyX0W8InuEddnpCZgrKM0nDuhbHmfLZS1yhg/certhash/uEiCCZxrd830q5k_tLX86jl6DK4qCTdKsH0M_T4nQGlu08Q/p2p/12D3KooWEBQi1GAUt1Ypftkvv1y2G9L2QHvjJ9A8oWRTDSnLwWLe')
await libp2p.peerStore.addressBook.add(peerIdFromString('12D3KooWEBQi1GAUt1Ypftkvv1y2G9L2QHvjJ9A8oWRTDSnLwWLe'), [marcoServer])
// TODO, this is to bootstrap a single http over libp2p server. We should get these peers from the router.
void libp2p.dial(marcoServer).then(() => { console.log('Marco server connected') }).catch(err => { console.log('error dialing marcoServer', err) })

// create a Helia node
const helia = await createHelia({
datastore: datastore as unknown as HeliaInit['datastore'],
blockstore,
bitswap: httpBitswap,
libp2p
})

Expand Down
1 change: 1 addition & 0 deletions vendor/ipfs-bitswap/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@
"@libp2p/logger": "^2.0.5",
"@libp2p/topology": "^4.0.0",
"@libp2p/tracked-map": "^3.0.0",
"@marcopolo_/libp2p-fetch": "^0.0.1",
"@multiformats/multiaddr": "^12.1.0",
"@vascosantos/moving-average": "^1.1.0",
"abortable-iterator": "^4.0.2",
Expand Down
225 changes: 225 additions & 0 deletions vendor/ipfs-bitswap/src/http-bitswap.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@

import type { PeerId } from '@libp2p/interface-peer-id'
import type { AbortOptions } from '@libp2p/interfaces'
import type { CID, Version } from 'multiformats'
import type { ProgressOptions } from 'progress-events'
import type { Bitswap, WantListEntry, BitswapWantBlockProgressEvents, Ledger, Stats, BitswapOptions } from './index.js'
import type { Blockstore } from 'interface-blockstore'
import type { BitswapNetworkNotifyProgressEvents } from './network.js'
import { fetchViaDuplex } from '@marcopolo_/libp2p-fetch'
import type { Libp2p } from 'libp2p'

// TODO: this should be a different protocol id
const IPFS_GATEWAY_PROTOCOL = '/libp2p-http'

interface WithTimestamp<T> { val: T, timestamp: number }

export interface HttpBitswapOptions {
httpOverLibp2pPeersLimit?: number
bootstrapHttpOnlyPeers?: string[]
bitswapOptions?: BitswapOptions
}

export const defaultOptions = {
httpOverLibp2pPeersLimit: 5,
bootstrapHttpOnlyPeers: [],
bitswapOptions: {}
}

export class HttpBitswap implements Bitswap {
stats: Stats = this.innerBitswap.stats
peers: PeerId[] = this.innerBitswap.peers
httpOverLibp2pPeersLimit: number
httpOverLibp2pPeers: Array<WithTimestamp<PeerId>> = []
httpOnlyPeers: string[] = []

constructor (private readonly libp2p: Libp2p, private readonly innerBitswap: Bitswap, private readonly blockstore: Blockstore, options: HttpBitswapOptions = defaultOptions) {
this.httpOverLibp2pPeersLimit = options.httpOverLibp2pPeersLimit ?? defaultOptions.httpOverLibp2pPeersLimit
const bootstrapHttpOnlyPeers = options.bootstrapHttpOnlyPeers ?? defaultOptions.bootstrapHttpOnlyPeers

libp2p.peerStore.addEventListener('change:protocols', (event) => {
const { peerId, protocols } = event.detail
if (protocols.includes(IPFS_GATEWAY_PROTOCOL)) {
this.newHttpOverLibp2pPeer(peerId)
}
})

this.httpOnlyPeers = [...bootstrapHttpOnlyPeers]
}

public newHttpOnlyPeer (url: string): void {
this.httpOnlyPeers.push(url)
}

private newHttpOverLibp2pPeer (peerId: PeerId): void {
if (this.httpOverLibp2pPeers.length < this.httpOverLibp2pPeersLimit) {
this.httpOverLibp2pPeers.push({ val: peerId, timestamp: Date.now() })
return
}

let i = 0
let minTimestamp = this.httpOverLibp2pPeers[0].timestamp
let minIndex = 0
for (const { timestamp, val } of this.httpOverLibp2pPeers) {
if (val === peerId) {
return // Already have this peer
}
if (timestamp < minTimestamp) {
minTimestamp = timestamp
minIndex = i
}
i++
}
this.httpOverLibp2pPeers[minIndex] = { val: peerId, timestamp: Date.now() }
}

wantlistForPeer (peerId: PeerId): Map<string, WantListEntry> {
return this.innerBitswap.wantlistForPeer(peerId)
}

ledgerForPeer (peerId: PeerId): Ledger | undefined {
return this.innerBitswap.ledgerForPeer(peerId)
}

unwant (cids: CID<unknown, number, number, Version> | Array<CID<unknown, number, number, Version>>): void {
this.innerBitswap.unwant(cids)
}

cancelWants (cids: CID<unknown, number, number, Version> | Array<CID<unknown, number, number, Version>>): void {
this.innerBitswap.cancelWants(cids)
}

getWantlist (): IterableIterator<[string, WantListEntry]> {
return this.innerBitswap.getWantlist()
}

notify (cid: CID<unknown, number, number, Version>, block: Uint8Array, options?: ProgressOptions<BitswapNetworkNotifyProgressEvents>): void {
this.innerBitswap.notify(cid, block, options)
}

async want (
cid: CID<unknown, number, number, Version>,
options?: AbortOptions & ProgressOptions<BitswapWantBlockProgressEvents>
): Promise<Uint8Array> {
// Start a bitswap req
const abortController = new AbortController()
if ((options?.signal) != null) {
options.signal.addEventListener('abort', () => { abortController.abort() })
}

let totalReqs = 1 + this.httpOverLibp2pPeers.length + this.httpOnlyPeers.length
let totalFailures = 0
const waitForAbortOrAllFailures = async <T>(err: T): Promise<T> => {
totalFailures++
if (totalFailures === totalReqs) {
throw err
}
// Wait for the abort so others can provide
await new Promise((resolve) => { options?.signal?.addEventListener('abort', resolve) })
return err
}

const bitswapWantPromise = this.innerBitswap.want(cid, { ...options, signal: abortController.signal })
.then((block) => {
console.log("Got block from bitswap", Date.now())
return block
})
.catch(async (err) => {
throw await waitForAbortOrAllFailures(err)
})

// Start a http req over libp2p
const httpOverLibp2pReqs = this.httpOverLibp2pPeers.map(async ({ val: peerId }) => {
try {
const conn = await this.libp2p.dial(peerId, { signal: options?.signal })
{
const s = await conn.newStream('/libp2p-http', { signal: options?.signal })
const fetch = fetchViaDuplex(s)
const resp: Response = await fetch(new Request(`https://example.com/ipfs/${cid.toString()}/`, { method: 'HEAD', headers: { 'Cache-Control': 'only-if-cached' } }))
if (!resp.ok) {
// We don't have the block here, block on the abort signal
throw new Error('Not found')
}
}

const s = await conn.newStream('/libp2p-http', { signal: options?.signal })
const fetch = fetchViaDuplex(s)
const resp: Response = await fetch(new Request(`https://example.com/ipfs/${cid.toString()}/?format=raw`))

if (resp.ok) {
const block = new Uint8Array(await resp.arrayBuffer())
await this.blockstore.put(cid, block)
console.log("Got block from http over libp2p", Date.now())
return block
}
// Otherwise, do nothing and block on the abort signal
throw new Error('Not found')
} catch (err) {
throw await waitForAbortOrAllFailures(err)
}
})

const httpOnlyReqs = this.httpOnlyPeers.map(async (url) => {
try {
{
// We should be using the Cache-Control header, but this is not a CORS allowed header on some gateways
// const resp: Response = await fetch(new Request(`${url}/ipfs/${cid.toString()}/`, { method: 'HEAD', headers: { 'Cache-Control': 'only-if-cached' } }), { signal: options?.signal })
const resp: Response = await fetch(new Request(`${url}/ipfs/${cid.toString()}/`, { method: 'HEAD' }), { signal: options?.signal })
if (!resp.ok) {
throw new Error('Not found')
}
}

const resp = await fetch(new Request(`${url}/ipfs/${cid.toString()}/?format=raw`), { signal: options?.signal })
if (resp.ok) {
const block = new Uint8Array(await resp.arrayBuffer())
await this.blockstore.put(cid, block)
console.log("Got block from http", Date.now())
return block
}
throw new Error('Not found')
} catch (err) {
throw await waitForAbortOrAllFailures(err)
}
})

// Wait for the first to finish
const block = await Promise.race([
bitswapWantPromise,
...httpOverLibp2pReqs,
...httpOnlyReqs
])
this.innerBitswap.notify(cid, block)

abortController.abort()
return block
}

isStarted (): boolean {
return this.innerBitswap.isStarted()
}

beforeStart? (): void | Promise<void> {
return this.innerBitswap.beforeStart?.()
}

start (): void | Promise<void> {
return this.innerBitswap.start()
}

afterStart? (): void | Promise<void> {
return this.innerBitswap.afterStart?.()
}

beforeStop? (): void | Promise<void> {
return this.innerBitswap.beforeStop?.()
}

stop (): void | Promise<void> {
return this.innerBitswap.stop()
}

afterStop? (): void | Promise<void> {
return this.innerBitswap.afterStop?.()
}
}
6 changes: 6 additions & 0 deletions vendor/ipfs-bitswap/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import type { AbortOptions } from '@libp2p/interfaces'
import type { Startable } from '@libp2p/interfaces/startable'
import type { ProgressEvent, ProgressOptions } from 'progress-events'
import type { BitswapNetworkNotifyProgressEvents, BitswapNetworkWantProgressEvents } from './network.js'
import { HttpBitswap, HttpBitswapOptions } from './http-bitswap.js'

export interface WantListEntry {
cid: CID
Expand Down Expand Up @@ -141,3 +142,8 @@ export interface BitswapOptions {
export const createBitswap = (libp2p: Libp2p, blockstore: Blockstore, options: BitswapOptions = {}): Bitswap => {
return new DefaultBitswap(libp2p, blockstore, options)
}

export const createBitswapWithHTTP = (libp2p: Libp2p, blockstore: Blockstore, options: HttpBitswapOptions = {}): Bitswap => {
const inner = new DefaultBitswap(libp2p, blockstore, options.bitswapOptions)
return new HttpBitswap(libp2p, inner, blockstore, options)
}
7 changes: 7 additions & 0 deletions webpack.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,16 @@ const common = {
})
],

ignoreWarnings: [/Failed to parse source map/],

// Determine how modules within the project are treated
module: {
rules: [
{
test: /\.js$/,
enforce: 'pre',
use: ['source-map-loader'],
},
// JavaScript: Use Babel to transpile JavaScript files
{
test: /\.[jt]sx?$/,
Expand Down