embarrassingly parallel for all three steps of sync

This commit is contained in:
fyears 2022-03-19 01:21:22 +08:00
parent 6a820ddfdd
commit f73e6b7796
3 changed files with 138 additions and 79 deletions

View File

@ -277,3 +277,25 @@ export const getSplitRanges = (bytesTotal: number, bytesEachPart: number) => {
export const getTypeName = (obj: any) => {
return Object.prototype.toString.call(obj).slice(8, -1);
};
/**
* Startting from 1
* @param x
* @returns
*/
export const atWhichLevel = (x: string) => {
if (
x === undefined ||
x === "" ||
x === "." ||
x === ".." ||
x.startsWith("/")
) {
throw Error(`do not know which level for ${x}`);
}
let y = x;
if (x.endsWith("/")) {
y = x.slice(0, -1);
}
return y.split("/").length;
};

View File

@ -32,6 +32,7 @@ import {
mkdirpInVault,
getFolderLevels,
getParentFolder,
atWhichLevel,
} from "./misc";
import { RemoteClient } from "./remote";
import {
@ -43,10 +44,9 @@ import {
DEFAULT_FILE_NAME_FOR_METADATAONREMOTE2,
isEqualMetadataOnRemote,
} from "./metadataOnRemote";
import { isInsideObsFolder, ObsConfigDirFileType } from "./obsFolderLister";
import * as origLog from "loglevel";
import { padEnd } from "lodash";
import { isInsideObsFolder, ObsConfigDirFileType } from "./obsFolderLister";
const log = origLog.getLogger("rs-default");
export type SyncStatusType =
@ -933,6 +933,69 @@ const dispatchOperationToActual = async (
}
};
const splitThreeSteps = (syncPlan: SyncPlanType, sortedKeys: string[]) => {
const mixedStates = syncPlan.mixedStates;
const totalCount = sortedKeys.length || 0;
const folderCreationOps: FileOrFolderMixedState[][] = [];
const deletionOps: FileOrFolderMixedState[][] = [];
const uploadDownloads: FileOrFolderMixedState[][] = [];
let realTotalCount = 0;
for (let i = 0; i < sortedKeys.length; ++i) {
const key = sortedKeys[i];
const val: FileOrFolderMixedState = Object.assign({}, mixedStates[key]); // copy to avoid issue
if (val.decision === "skipFolder" || val.decision === "skipUploading") {
// pass
} else if (val.decision === "createFolder") {
const level = atWhichLevel(key);
if (folderCreationOps[level - 1] === undefined) {
folderCreationOps[level - 1] = [val];
} else {
folderCreationOps[level - 1].push(val);
}
realTotalCount += 1;
} else if (
val.decision === "uploadLocalDelHistToRemoteFolder" ||
val.decision === "keepRemoteDelHistFolder" ||
val.decision === "uploadLocalDelHistToRemote" ||
val.decision === "keepRemoteDelHist"
) {
const level = atWhichLevel(key);
if (deletionOps[level - 1] === undefined) {
deletionOps[level - 1] = [val];
} else {
deletionOps[level - 1].push(val);
}
realTotalCount += 1;
} else if (
val.decision === "uploadLocalToRemote" ||
val.decision === "downloadRemoteToLocal"
) {
if (uploadDownloads.length === 0) {
uploadDownloads[0] = [val];
} else {
uploadDownloads[0].push(val); // only one level needed here
}
realTotalCount += 1;
} else {
throw Error(`unknown decision ${val.decision} for ${key}`);
}
}
// the deletionOps should be run from shadowest to deepest
// so we need to reverse it!
deletionOps.reverse(); // inplace reverse
return {
folderCreationOps: folderCreationOps,
deletionOps: deletionOps,
uploadDownloads: uploadDownloads,
realTotalCount: realTotalCount,
};
};
export const doActualSync = async (
client: RemoteClient,
db: InternalDBs,
@ -987,95 +1050,47 @@ export const doActualSync = async (
);
log.debug(`finished ${key}`);
}
} else {
return; // shortcut return, avoid too many nests below
}
const { folderCreationOps, deletionOps, uploadDownloads, realTotalCount } =
splitThreeSteps(syncPlan, sortedKeys);
const nested = [folderCreationOps, deletionOps, uploadDownloads];
const logTexts = [
`1. create all folders from shadowest to deepest, also check undefined decision`,
`2. delete files and folders from deepest to shadowest`,
`3. upload or download files in parallel, with the desired concurrency=${concurrency}`,
];
let realCounter = 0;
log.debug(
`1. create all folders from shadowest to deepest, also check undefined decision`
);
for (let i = sortedKeys.length - 1; i >= 0; --i) {
const key = sortedKeys[i];
const val = mixedStates[key];
for (let i = 0; i < nested.length; ++i) {
log.debug(logTexts[i]);
if (
val.decision === undefined ||
val.decision === "skipFolder" ||
val.decision === "createFolder"
) {
log.debug(`start syncing "${key}" with plan ${JSON.stringify(val)}`);
const operations: FileOrFolderMixedState[][] = nested[i];
if (callbackSyncProcess !== undefined) {
await callbackSyncProcess(realCounter, totalCount, key, val.decision);
}
realCounter += 1;
for (let j = 0; j < operations.length; ++j) {
const singleLevelOps: FileOrFolderMixedState[] | undefined =
operations[j];
await dispatchOperationToActual(
key,
vaultRandomID,
val,
client,
db,
vault,
localDeleteFunc,
password
);
log.debug(`finished ${key}`);
}
if (singleLevelOps === undefined || singleLevelOps === null) {
continue;
}
log.debug(`2. delete files and folders from deepest to shadowest`);
for (let i = 0; i < sortedKeys.length; ++i) {
const key = sortedKeys[i];
const val = mixedStates[key];
if (
val.decision === "uploadLocalDelHistToRemoteFolder" ||
val.decision === "keepRemoteDelHistFolder" ||
val.decision === "uploadLocalDelHistToRemote" ||
val.decision === "keepRemoteDelHist"
) {
log.debug(`start syncing "${key}" with plan ${JSON.stringify(val)}`);
if (callbackSyncProcess !== undefined) {
await callbackSyncProcess(realCounter, totalCount, key, val.decision);
}
realCounter += 1;
await dispatchOperationToActual(
key,
vaultRandomID,
val,
client,
db,
vault,
localDeleteFunc,
password
);
log.debug(`finished ${key}`);
}
}
log.debug(
`3. upload or download files in parallel, with the desired concurrency=${concurrency}`
);
const queue = new PQueue({ concurrency: concurrency, autoStart: true });
// const commands: any[] = [];
for (let k = 0; k < singleLevelOps.length; ++k) {
const val: FileOrFolderMixedState = singleLevelOps[k];
const key = val.key;
for (let i = 0; i < sortedKeys.length; ++i) {
const key = sortedKeys[i];
const val = mixedStates[key];
if (
val.decision === "skipUploading" ||
val.decision === "uploadLocalToRemote" ||
val.decision === "downloadRemoteToLocal"
) {
const fn = async () => {
log.debug(`start syncing "${key}" with plan ${JSON.stringify(val)}`);
if (callbackSyncProcess !== undefined) {
await callbackSyncProcess(
realCounter,
totalCount,
realTotalCount,
key,
val.decision
);
@ -1098,8 +1113,8 @@ export const doActualSync = async (
};
queue.add(fn);
}
}
await queue.onIdle();
}
}
};

View File

@ -244,3 +244,25 @@ describe("Misc: get split ranges", () => {
expect(k).to.deep.equal(k2);
});
});
describe("Misc: at which level", () => {
it("should throw error on some parameters", () => {
expect(() => misc.atWhichLevel(undefined)).to.throw();
expect(() => misc.atWhichLevel("")).to.throw();
expect(() => misc.atWhichLevel("..")).to.throw();
expect(() => misc.atWhichLevel(".")).to.throw();
expect(() => misc.atWhichLevel("/")).to.throw();
expect(() => misc.atWhichLevel("/xxyy")).to.throw();
});
it("should treat folders correctly", () => {
expect(misc.atWhichLevel("x/")).to.be.equal(1);
expect(misc.atWhichLevel("x/y/")).to.be.equal(2);
});
it("should treat files correctly", () => {
expect(misc.atWhichLevel("x.md")).to.be.equal(1);
expect(misc.atWhichLevel("x/y.md")).to.be.equal(2);
expect(misc.atWhichLevel("x/y/z.md")).to.be.equal(3);
});
});