Python implementation of Bluesky PDS and AT Protocol, including repo, MST, and sync XRPC methods
Python implementation of Bluesky PDS and AT Protocol, including data repository, Merkle search tree, and XRPC methods.
You can build your own PDS on top of arroba with just a few lines of Python and run it in any WSGI server. You can build a more involved PDS with custom logic and behavior. Or you can build a different ATProto service, eg an AppView, relay (née BGS), or something entirely new!
Install from PyPI with pip install arroba
.
Arroba is the Spanish word for the @ character ("at sign").
License: This project is placed in the public domain. You may also use it under the CC0 License.
Here's minimal example code for a multi-repo PDS on top of arroba and Flask:
from flask import Flask
from google.cloud import ndb
from lexrpc.flask_server import init_flask
from arroba import server
from arroba.datastore_storage import DatastoreStorage
from arroba.firehose import send_events
# for Google Cloud Datastore
ndb_client = ndb.Client()
server.storage = DatastoreStorage(ndb_client=ndb_client)
server.repo.callback = lambda _: send_events() # to subscribeRepos
app = Flask('my-pds')
init_flask(server.server, app)
def ndb_context_middleware(wsgi_app):
def wrapper(environ, start_response):
with ndb_client.context():
return wsgi_app(environ, start_response)
return wrapper
app.wsgi_app = ndb_context_middleware(app.wsgi_app)
See app.py
for a more comprehensive example, including a CORS handler for OPTIONS
preflight requests and a catch-all app.bsky.*
XRPC handler that proxies requests to the AppView.
Arroba consists of these parts:
Storage
abstract base classDatastoreStorage
(uses Google Cloud Datastore)did
: create and resolve did:plc
s, did:web
s, and domain handlesdiff
: find the deterministic minimal difference between two MST
sutil
: miscellaneous utilities for TIDs, AT URIs, signing and verifying signatures, generating JWTs, encoding/decoding, and moreConfigure arroba with these environment variables:
APPVIEW_HOST
, default api.bsky-sandbox.dev
RELAY_HOST
, default bgs.bsky-sandbox.dev
PLC_HOST
, default plc.bsky-sandbox.dev
PDS_HOST
, where you're running your PDSOptional, only used in com.atproto.repo, .server, and .sync XRPC handlers:
REPO_TOKEN
, static token to use as both accessJwt
and refreshJwt
, defaults to contents of repo_token
file. Not required to be an actual JWT. If not set, XRPC methods that require auth will return HTTP 501 Not Implemented.ROLLBACK_WINDOW
, number of events to serve in the subscribeRepos
rollback window, as an integer. Defaults to 50k.PRELOAD_WINDOW
, number of events to preload into the subscribeRepos
rollback window at startup, as an integer. Defaults to 4k.SUBSCRIBE_REPOS_BATCH_DELAY
, minimum time to wait between datastore queries in com.atproto.sync.subscribeRepos
, in seconds, as a float. Defaults to 0 if unset.xrpc_sync.subscribe_repos
now includes covering proof blocks and new prev
and prevData
fields.MST
:
cids_for_path
, add_covering_proofs
methods.Repo
:
apply_writes
: skip no-op update operations where the new record value is the same as the existing stored record. (No-op updates are evidently illegal in ATProto.)#sync
event when a new repo is created.Storage
:
read_events_by_seq
: always include the MST root block in every commit event.xrpc_sync
:
subscribeRepos
to unify event stream generation across all subscribers. This significantly improves scalability and reduces CPU and I/O to near constant, with minimal additional overhead per subscriber (#52).Breaking changes:
repo
:
apply_commit
, apply_writes
: raise an exception if the repo is inactive.storage
:
create_repo
: remove signing_key
and rotation_key
kwargs, read them from input repo instead.load_repo
: don't raise an exception if the repo is tombstoned.datastore_storage
:
AtpBlock.decoded
in the datastore, it's now just an in memory @property
.util
:
TombstonedRepo
to InactiveRepo
.Non-breaking changes:
datastore_storage
:
DatastoreStorage
:
ndb_context_kwargs
constructor kwarg.apply_commit
: handle deactivated repos.create_repo
: propagate Repo.status
into AtpRepo
.AtpRemoteBlob
:
get_or_create
: drop datastore transaction.width
and height
properties, populated for images and videos, to be used in image/video embed aspectRatio
(snarfed/bridgy-fed#1571).ValidationError
on videos over 3 minutes.did
:
get_signing_key
, get_handle
functions.create_plc
: remove trailing slash from services.atproto_pds.endpoint
.storage
:
Storage
: add new write_blocks
method, implement in MemoryStorage
and DatastoreStorage
.xrpc_repo
:
describe_server
: include all app.bsky
collections and others like chat.bsky.actor.declaration
; fetch and include DID doc.com.atproto.repo.importRepo
.xrpc_sync
:
get_blob
:
Cache-Control
to cache for 1h.list_repos
:
rev
, not integer sequence number.null
if the account is active.Breaking changes:
storage
:
Storage.write
to return Block
instead of CID
.Non-breaking changes:
did
:
update_plc
method.create_plc
: add new also_known_as
kwarg.resolve_handle
: drop Content-Type: text/plain
requirement for HTTPS method.mst
:
start
kwarg to load_all
.repo
:
subscribeRepos
when creating new repos.storage
:
deactivate_repo
, activate_repo
, and write_event
methods.repo
kwarg to read_blocks_by_seq
and read_events_by_seq
to limit returned results to a single repo.datastore_storage
:
max_size
and accept_types
kwarg to AtpRemoteBlob.get_or_create
for the blob's maxSize
and accept
parameters in its lexicon. If the fetched file doesn't satisfy those constraints, raises lexrpc.ValidationError.
DatastoreStorage.read_blocks_by_seq
: use strong consistency for datastore query. May fix occasional AssertionError
when serving subscribeRepos
.xrpc_sync
:
getBlob
from returning HTTP 302 to 301.since
param in getRepo
.subscribeRepos
: wait up to 60s on a skipped sequence number before giving up and emitting it as a gap.util
:
service_jwt
: add new **claims
parameter for additional JWT claims, eg lxm
.Breaking changes:
datastore_storage
:
DatastoreStorage
: add new required ndb_client
kwarg to constructor, used to get new context in lexrpc websocket subscription handlers that run server methods like subscribeRepos
in separate threads (snarfed/lexrpc#8).DatastoreStorage.read_blocks_by_seq
: if the ndb context gets closed while we're still running, log a warning and return. (This can happen in eg flask_server
if the websocket client disconnects early.)AtpRemoteBlob
: if the blob URL doesn't return the Content-Type
header, infer type from the URL, or fall back to application/octet-stream
(bridgy-fed#1073).did
:
resolve_plc
, resolve_web
, and resolve_handle
for 6h, up to 5000 total results per call.storage
: rename Storage.read_commits_by_seq
to read_events_by_seq
for new account tombstone support.xrpc_sync
: rename send_new_commits
to send_events
, ditto.xrpc_repo
: stop requiring auth for read methods: getRecord
, listRecords
, describeRepo
.Non-breaking changes:
did
:
HANDLE_RE
regexp for handle validation.storage
:
Storage.tombstone_repo
method, implemented in MemoryStorage
and DatastoreStorage
. Used to delete accounts. (bridgy-fed#783)Storage.load_repos
method, implemented in MemoryStorage
and DatastoreStorage
. Used for com.atproto.sync.listRepos
.util
:
service_jwt
: add optional aud
kwarg.xrpc_sync
:
subscribeRepos
:
ROLLBACK_WINDOW
environment variable to limit size of rollback window. Defaults to no limit.time
instead of the current time (snarfed/bridgy-fed#1015).getRepo
queries with the since
parameter. since
still isn't actually implemented, but we now serve the entire repo instead of returning an error.getRepoStatus
method.listRepos
method.getRepo
bug fix: include the repo head commit block.xrpc_repo
:
getRecord
: encoded returned records correctly as ATProto-flavored DAG-JSON.xrpc_*
: return RepoNotFound
and RepoDeactivated
errors when appropriate (snarfed/bridgy-fed#1083).at://
URIs, commit rev
s, etc. Before, we were using the integer UNIX timestamp directly, which happened to be the same 13 character length. Oops.BGS_HOST
environment variable to RELAY_HOST
. BGS_HOST
is still supported for backward compatibility.datastore_storage
:
DatastoreStorage.last_seq
, handle new NSID.AtpRemoteBlob
class for storing "remote" blobs, available at public HTTP URLs, that we don't store ourselves.did
:
create_plc
: strip padding from genesis operation signature (for did-method-plc#54, atproto#1839).resolve_handle
: return None on bad domain, eg .foo.com
.resolve_handle
bug fix: handle charset
specifier in HTTPS method response Content-Type
.util
:
new_key
: add seed
kwarg to allow deterministic key generation.xrpc_repo
:
getRecord
: try to load record locally first; if not available, forward to AppView.xrpc_sync
:
getBlob
, right now only based on "remote" blobs stored in AtpRemoteBlob
s in datastore storage.subscribeRepos
sequence number is reused as the new rev
field in commits. (Discussion.).did
module with utilities to create and resolve did:plc
s and resolve did:web
s.util.service_jwt
function that generates ATProto inter-service JWTs.Repo
:
signing_key
/rotation_key
attributes. Generate store, and load both in datastore_storage
.format_init_commit
, migrate existing calls to format_commit
.Storage
:
read_from_seq
=> read_blocks_by_seq
(and in MemoryStorage
and DatastoreStorage
), add new read_commits_by_seq
method.load_repo
did
/handle
kwargs into did_or_handle
.subscribeRepos
check storage for all new commits every time it wakes up.
xrpc_sync.enqueue_commit
with new send_new_commits
function that takes no parameters.app.bsky
/com.atproto
lexicons, use lexrpc's instead.Big milestone: arroba is successfully federating with the ATProto sandbox! See app.py for the minimal demo code needed to wrap arroba in a fully functional PDS.
com.atproto
XRPC methods needed to federate with sandbox, including most of repo
and sync
.
subscribeRepos
server side over websocket.Implement repo and commit chain in new Repo class, including pluggable storage. This completes the first pass at all PDS data structures. Next release will include initial implementations of the com.atproto.sync.*
XRPC methods.
Initial release! Still very in progress. MST, Walker, and Diff classes are mostly complete and working. Repo, commits, and sync XRPC methods are still in progress.
Here's how to package, test, and ship a new release.
Remove the cursor 14675627 hack in xrpc_sync.subscribe_repos!
Run the unit tests.
source local/bin/activate.csh
python -m unittest discover
python -m unittest arroba.tests.mst_test_suite # more extensive, slower tests (deliberately excluded from autodiscovery)
Bump the version number in pyproject.toml
and docs/conf.py
. git grep
the old version number to make sure it only appears in the changelog. Change the current changelog entry in README.md
for this new version from unreleased to the current date.
Build the docs. If you added any new modules, add them to the appropriate file(s) in docs/source/
. Then run ./docs/build.sh
. Check that the generated HTML looks fine by opening docs/_build/html/index.html
and looking around.
setenv ver X.Y
git commit -am "release v$ver"
Upload to test.pypi.org for testing.
python -m build
twine upload -r pypitest dist/arroba-$ver*
Install from test.pypi.org.
cd /tmp
python -m venv local
source local/bin/activate.csh
# make sure we force pip to use the uploaded version
pip uninstall arroba
pip install --upgrade pip
pip install -i https://test.pypi.org/simple --extra-index-url https://pypi.org/simple arroba==$ver
deactivate
Smoke test that the code trivially loads and runs.
source local/bin/activate.csh
python
from arroba import did
did.resolve_handle('snarfed.org')
deactivate
Tag the release in git. In the tag message editor, delete the generated comments at bottom, leave the first line blank (to omit the release "title" in github), put ### Notable changes
on the second line, then copy and paste this version's changelog contents below it.
git tag -a v$ver --cleanup=verbatim
git push && git push --tags
Click here to draft a new release on GitHub. Enter vX.Y
in the Tag version box. Leave Release title empty. Copy ### Notable changes
and the changelog contents into the description text box.
Upload to pypi.org!
twine upload dist/arroba-$ver*
Wait for the docs to build on Read the Docs, then check that they look ok.
On the Versions page, check that the new version is active, If it's not, activate it in the Activate a Version section.
Just having fun with python and Bluesky's AT Protocol. Trying to build a simple CLI and enough of SDK to easily explore and collect my own Bluesky data
A small python library to post basic text and media to bsky.app via atproto's xrpc API.
A script for auto-deleting Bluesky posts
A site that measures the current percentage of Bluesky posts missing alt text
🌉 A bridge between decentralized social networks
💬 The social web translator
Your Brand Here!
50K+ engaged viewers every month
Limited spots available!
📧 Contact us via email🦋 Contact us on Bluesky