Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion bin/refresh-all-issues
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
#!/usr/bin/env node
import { default as debugInit } from '../lib/debug.js';
import refresh, { reportFailures } from '../lib/refresh.js';
import { createPacedRefresh, reportFailures } from '../lib/refresh.js';
import selectReposArg from '../lib/select-repos-arg.js';
import db from '../lib/db.js';

debugInit('pulldasher:refresh*');

const refresh = createPacedRefresh();

refresh.allIssues(selectReposArg())
.done(function (result) {
reportFailures(result);
Expand Down
4 changes: 3 additions & 1 deletion bin/refresh-all-pulls
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
#!/usr/bin/env node
import { default as debugInit } from '../lib/debug.js';
import refresh, { reportFailures } from '../lib/refresh.js';
import { createPacedRefresh, reportFailures } from '../lib/refresh.js';
import selectReposArg from '../lib/select-repos-arg.js';
import db from '../lib/db.js';

debugInit('pulldasher:refresh*');

const refresh = createPacedRefresh();

refresh.allPulls(selectReposArg())
.done(function (result) {
reportFailures(result);
Expand Down
4 changes: 3 additions & 1 deletion bin/refresh-open-issues
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
#!/usr/bin/env node
import { default as debugInit } from '../lib/debug.js';
import refresh, { reportFailures } from '../lib/refresh.js';
import { createPacedRefresh, reportFailures } from '../lib/refresh.js';
import selectReposArg from '../lib/select-repos-arg.js';
import db from '../lib/db.js';

debugInit('pulldasher:refresh*');

const refresh = createPacedRefresh();

refresh.openIssues(selectReposArg())
.done(function (result) {
reportFailures(result);
Expand Down
4 changes: 3 additions & 1 deletion bin/refresh-open-pulls
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
#!/usr/bin/env node
import { default as debugInit } from '../lib/debug.js';
import refresh, { reportFailures } from '../lib/refresh.js';
import { createPacedRefresh, reportFailures } from '../lib/refresh.js';
import selectReposArg from '../lib/select-repos-arg.js';
import db from '../lib/db.js';

debugInit('pulldasher:refresh*');

const refresh = createPacedRefresh();

refresh.openPulls(selectReposArg())
.done(function (result) {
reportFailures(result);
Expand Down
5 changes: 5 additions & 0 deletions config.example.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ module.exports = {
callbackURL: "http://localhost:" + port + "/auth/github/callback",
// An API token for the backend to make API requests with.
token: "oauth api token for server-side api calls",
// How much of the token's hourly GitHub API quota to reserve for live
// webhook/socket traffic. Bulk backfills (bin/refresh-*) pace themselves to
// stay above this floor and pause when remaining quota hits it, so a large
// backfill never starves normal operation. Defaults to 1000 if omitted.
bulkReserve: 1000,
// This will need to be the same secret you use on the Webhooks page for
// the repo Pulldasher is going to monitor.
hook_secret:
Expand Down
79 changes: 69 additions & 10 deletions lib/git-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import Label from "../models/label.js";
import Status from "../models/status.js";
import Signature from "../models/signature.js";
import getLogin from "./get-user-login.js";
import { noopPacer } from "./pacer.js";

const MyOctokit = Octokit.plugin(throttling, retry);
const gitDebug = debug("pulldasher:github");
Expand Down Expand Up @@ -74,6 +75,34 @@ github.hook.error("request", (error, options) => {
throw error;
});

/**
* Feed a pacer the quota headers from every GitHub response, so it can pace bulk
* work against real consumption — crucially the per-pull/issue fan-out deep in
* `parse`/`parseIssue`, which is the bulk of a backfill's spend and which the
* caller can't see to observe itself. Only the CLI backfill bins call this (once
* at startup, with their pacer); the server never installs an observer, so its
* webhook/socket responses aren't tracked and nothing is paced.
*
* Installs the hooks at most once per process: they close over the first
* pacer, and a second call would only stack duplicate observers on the shared
* client (double-counting every response). The bins call this exactly once.
*/
let rateLimitObserved = false;
export function observeRateLimit(pacer) {
if (rateLimitObserved) {
return;
}
rateLimitObserved = true;
github.hook.after("request", (response) => {
pacer.observe(response.headers);
});
github.hook.error("request", (error) => {
// A failed response still reports the current quota.
pacer.observe(error.response && error.response.headers);
throw error;
});
}

const githubRest = github.rest;

export default {
Expand All @@ -98,9 +127,9 @@ export default {
*
* Returns a promise which resolves to an array of all open pull requests
*/
getOpenPulls: function (repo) {
getOpenPulls: function (repo, pacer = noopPacer) {
return logErrors(
github.paginate(githubRest.pulls.list, params({ state: "open" }, repo)),
pacedPaginate(githubRest.pulls.list, params({ state: "open" }, repo), pacer),
"Getting open pulls in repo %s",
repo
);
Expand All @@ -111,9 +140,9 @@ export default {
*
* Returns a promise which resolves to an array of all pull requests
*/
getAllPulls: function (repo) {
getAllPulls: function (repo, pacer = noopPacer) {
return logErrors(
github.paginate(githubRest.pulls.list, params({ state: "all" }, repo)),
pacedPaginate(githubRest.pulls.list, params({ state: "all" }, repo), pacer),
"Getting all pulls in repo %s",
repo
);
Expand All @@ -131,11 +160,10 @@ export default {
*
* Returns a promise which resolves to an array of all open issues
*/
getOpenIssues: function (repo) {
getOpenIssues: function (repo, pacer = noopPacer) {
const searchParams = params({ state: "open" }, repo);
return logErrors(
github
.paginate(githubRest.issues.listForRepo, searchParams)
pacedPaginate(githubRest.issues.listForRepo, searchParams, pacer)
.then(filterOutPulls)
.then(addRepo(searchParams)),
"Getting open issues in repo %s",
Expand All @@ -148,11 +176,10 @@ export default {
*
* Returns a promise which resolves to an array of all issues
*/
getAllIssues: function (repo) {
getAllIssues: function (repo, pacer = noopPacer) {
const searchParams = params({ state: "all" }, repo);
return logErrors(
github
.paginate(githubRest.issues.listForRepo, searchParams)
pacedPaginate(githubRest.issues.listForRepo, searchParams, pacer)
.then(filterOutPulls)
.then(addRepo(searchParams)),
"Getting all issues in repo %s",
Expand Down Expand Up @@ -645,3 +672,35 @@ function logErrors(promise, ...messageAndArgs) {
throw err;
});
}

/**
* Paginate a bulk listing, pacing between pages so a full-repo list can't burst
* through the quota. Page one fetches un-paced and seeds the pacer's quota view
* (the bin's observeRateLimit hook records each response); each `gate()` then
* spaces the fetch of the next page. The gate fronts a fetch, so it only runs
* when another page follows — gating after the last page would delay the result
* (or, below the reserve floor, pause for a full reset window) with no fetch
* left to pace. The `pacer` is injected by the caller — the CLI backfill bins
* pass a real one; everything else takes the default no-op, so the pages fetch
* unpaced (the server's startup open-pulls listing, and the live webhook/socket
* path, which never lists whole repos anyway).
*/
async function pacedPaginate(route, parameters, pacer = noopPacer) {
const items = [];
for await (const response of github.paginate.iterator(route, parameters)) {
items.push(...response.data);
if (hasNextPage(response)) {
await pacer.gate();
}
}
return items;
}

/**
* Whether a paginated response links to a further page — the same `rel="next"`
* Link-header signal Octokit's own iterator uses to decide whether to continue.
*/
function hasNextPage(response) {
const link = response.headers && response.headers.link;
return Boolean(link) && link.includes('rel="next"');
}
147 changes: 147 additions & 0 deletions lib/pacer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
import config from "./config-loader.js";
import debug from "./debug.js";
import Promise from "bluebird";

const pacerDebug = debug("pulldasher:pacer");

// Only log a paced delay once it's long enough to explain a visibly slow
// backfill — short spacing isn't worth the noise.
const LOG_DELAY_THRESHOLD_MS = 5000;

/**
* A pacer proactively paces bulk GitHub requests so a backfill yields to live
* traffic instead of draining the shared token's quota to zero. It is a
* per-process instance the entry point constructs and installs: only the CLI
* backfill bins build a real one (`createPacer`); the server installs nothing
* and runs against `noopPacer`, so live webhook/socket refreshes are never
* delayed. Rate state is global to the token, so within a process every bulk
* call gates against the same view, fed by `observe` on every response (live
* calls included).
*
* `observe(headers)` — record the latest quota from any response (free).
* `gate()` — await this before each *bulk* unit of work (a consumer drain or a
* pagination page); it spreads bulk work across the reset window and blocks
* entirely while below the reserve floor.
*
* Pacing is calibrated by actual consumption, not by gate count: each gate
* advances a token-bucket cursor by the number of requests spent since the last
* gate (`x-ratelimit-used` delta), so a pull whose processing fans out to a
* dozen calls correctly waits a dozen intervals — and a live-traffic spike,
* which also bumps `used`, pushes the cursor out and makes bulk yield.
*/
export function createPacer({ reserve = config.github.bulkReserve ?? 1000 } = {}) {
let remaining = null;
let reset = null;
let used = null;
let lastUsed = null;
let nextSlot = 0;

return {
observe: function (headers) {
if (!headers) {
return;
}
if (headers["x-ratelimit-remaining"] !== undefined) {
remaining = Number(headers["x-ratelimit-remaining"]);
}
if (headers["x-ratelimit-reset"] !== undefined) {
reset = Number(headers["x-ratelimit-reset"]);
}
if (headers["x-ratelimit-used"] !== undefined) {
used = Number(headers["x-ratelimit-used"]);
}
},

gate: function () {
const now = Date.now();
const result = computePace({
remaining,
reset,
used,
reserve,
now,
nextSlot,
lastUsed,
});
nextSlot = result.nextSlot;
lastUsed = result.lastUsed;
const delayMs = result.delayMs;

// The floor-pause is the only positive delay that isn't even-spread pacing.
if (delayMs > 0 && remaining != null && remaining <= reserve) {
pacerDebug(
"bulk paused: remaining %s ≤ reserve %s, waiting %ss for quota reset",
remaining,
reserve,
Math.round(delayMs / 1000)
);
} else if (delayMs >= LOG_DELAY_THRESHOLD_MS) {
pacerDebug(
"pacing bulk: waiting %sms (remaining %s)",
Math.round(delayMs),
remaining
);
}

return Promise.delay(delayMs);
},
};
}

// The Null Object pacer the server and webhook/socket path run against: every
// call is total, so call sites stay unconditional (no null guards on the hot
// Octokit hooks or the queue consumer).
export const noopPacer = {
observe: function () {},
gate: function () {
return Promise.resolve();
},
};

/**
* Pure pacing decision. Given the latest quota view and a token-bucket cursor,
* returns how long the next bulk request should wait, the advanced cursor, and
* the `used` baseline to diff against next time.
*
* - unknown quota, or a `reset` already in the past (stale window) → allow
* immediately; the next response refreshes the view.
* - `remaining ≤ reserve` → pause until `reset` (let the window refill).
* - otherwise → even-spread: the spendable budget (`remaining − reserve`)
* divided across the time left in the window gives the per-request interval.
* Advance the cursor by `spent × interval`, where `spent` is the requests
* consumed since the last gate (`used − lastUsed`), so the spacing tracks
* real consumption — a multi-call item or a live spike pushes the next slot
* further out. A shrinking `remaining` also widens the interval, so bulk
* yields on both signals.
*/
export function computePace({
remaining,
reset,
used,
reserve,
now,
nextSlot,
lastUsed,
}) {
const baseline = used ?? lastUsed;
// No quota view, or a window that already rolled over (our `remaining` is
// stale): proceed now and let the next response refresh the view.
const allowNow = { delayMs: 0, nextSlot: now, lastUsed: baseline };
if (remaining == null || reset == null) {
return allowNow;
}
const resetMs = reset * 1000;
if (resetMs <= now) {
return allowNow;
}
if (remaining <= reserve) {
return { delayMs: resetMs - now, nextSlot: resetMs, lastUsed: baseline };
}
const interval = (resetMs - now) / (remaining - reserve);
// Clamp at 0: across a window rollover the server resets `used` to ~0 while
// our `lastUsed` still holds the pre-reset high, so the delta can go negative.
const spent =
used == null || lastUsed == null ? 0 : Math.max(0, used - lastUsed);
const slot = Math.max(now, nextSlot + spent * interval);
return { delayMs: slot - now, nextSlot: slot, lastUsed: baseline };
}
Loading