~xdavidwu/saf-cephfs

bf4439c3ae04a7b54e81e9c30d1ed919218df049 — Pinghao Wu 4 months ago ca3d19a
CephFSExecutor: impl mount mgmt, also bring to fd callback
3 files changed, 149 insertions(+), 112 deletions(-)

M src/main/java/org/safcephfs/CephFSDocumentsProvider.java
R src/main/java/org/safcephfs/{CephFSOperations.java => CephFSExecutor.java}
M src/main/java/org/safcephfs/CephFSProxyFileDescriptorCallback.java
M src/main/java/org/safcephfs/CephFSDocumentsProvider.java => src/main/java/org/safcephfs/CephFSDocumentsProvider.java +57 -78
@@ 32,7 32,7 @@ import java.io.IOException;
import java.io.UncheckedIOException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Vector;
import java.util.HashMap;
import java.util.Locale;

import com.ceph.fs.CephMount;


@@ 41,10 41,9 @@ import com.ceph.fs.CephStatVFS;
import com.ceph.fs.CephNotDirectoryException;

public class CephFSDocumentsProvider extends DocumentsProvider {
	private String id, mon, path, key;
	private StorageManager sm;
	private Handler ioHandler;
	private CephMount cm = null;
	private CephFSExecutor executor;
	private int uid;
	private ToastThread lthread;



@@ 114,46 113,6 @@ public class CephFSDocumentsProvider extends DocumentsProvider {
		lthread.handler.sendMessage(msg);
	}

	private CephFSOperations.Operation<CephMount> setupMount = () -> {
		SharedPreferences settings = PreferenceManager
			.getDefaultSharedPreferences(getContext());
		mon = settings.getString("mon", "");
		key = settings.getString("key", "");
		id = settings.getString("id", "");
		path = settings.getString("path", "");
		String timeout = settings.getString("timeout", "");
		timeout = timeout.matches("\\d+") ? timeout : "20";
		CephMount newMount = new CephMount(id);
		newMount.conf_set("mon_host", mon);
		newMount.conf_set("key", key);
		newMount.conf_set("client_mount_timeout", timeout);
		newMount.conf_set("client_dirsize_rbytes", "false");
		checkPermissions = settings.getBoolean("permissions", true);
		if (!checkPermissions) {
			newMount.conf_set("client_permissions", "false");
		}
		newMount.mount(path); // IOException if fails
		return newMount;
	};

	// TODO refactor to a executor, and port to CephFSProxyFileDescriptorCallback
	private <T> CephFSOperations.Operation<T> withLazyRetriedMount(
			CephFSOperations.Operation<T> op) {
		return () -> {
			if (cm == null) {
				cm = setupMount.execute();
			}
			return CephFSOperations.retryOnESHUTDOWN(
				() -> {
					cm.unmount();
					Log.e(APP_NAME, "Mount died, retrying");
					cm = setupMount.execute();
					return null;
				},
				op).execute();
		};
	}

	@Override
	public boolean onCreate() {
		APP_NAME = getContext().getString(R.string.app_name);


@@ 165,6 124,35 @@ public class CephFSDocumentsProvider extends DocumentsProvider {
		ioThread.start();
		ioHandler = new Handler(ioThread.getLooper());
		uid = Process.myUid();

		SharedPreferences settings = PreferenceManager
			.getDefaultSharedPreferences(getContext());
		SharedPreferences.OnSharedPreferenceChangeListener l = (sp, key) -> {
			var id = sp.getString("id", "");
			var path = sp.getString("path", "");
			var timeout = sp.getString("timeout", "");
			timeout = timeout.matches("\\d+") ? timeout : "20";
			var checkPermissions = sp.getBoolean("permissions", true);
			var config = new HashMap<String, String>();

			config.put("mon_host", sp.getString("mon", ""));
			config.put("key", sp.getString("key", ""));
			config.put("client_mount_timeout", timeout);
			config.put("client_dirsize_rbytes", "false");
			if (!checkPermissions) {
				config.put("client_permissions", "false");
			}

			config.put("debug_javaclient", "20");
			config.put("debug_ms", "1");
			config.put("debug_client", "10");
			config.put("ms_connection_ready_timeout", "3");

			var c = new CephFSExecutor.CephMountConfig(id, path, config);
			executor = new CephFSExecutor(c);
		};
		settings.registerOnSharedPreferenceChangeListener(l);
		l.onSharedPreferenceChanged(settings, "");
		return true;
	}



@@ 174,16 162,16 @@ public class CephFSDocumentsProvider extends DocumentsProvider {
		var path = Uri.parse(parentDocumentId).getPath();
		String filename = path + "/" + displayName;
		if (mimeType.equals(Document.MIME_TYPE_DIR)) {
			CephFSOperations.translateToUnchecked(withLazyRetriedMount(() -> {
			executor.executeWithUnchecked(cm -> {
				cm.mkdir(filename, 0700);
				return null;
			}));
			});
		} else {
			CephFSOperations.translateToUnchecked(withLazyRetriedMount(() -> {
			executor.executeWithUnchecked(cm -> {
				int fd = cm.open(filename, CephMount.O_WRONLY | CephMount.O_CREAT | CephMount.O_EXCL, 0700);
				cm.close(fd);
				return null;
			}));
			});
		}
		return parentDocumentId + "/" + filename;
	}


@@ 217,16 205,12 @@ public class CephFSDocumentsProvider extends DocumentsProvider {
			throw new UnsupportedOperationException("Mode " + mode + " not implemented");
		}

		int fd = CephFSOperations.translateToUnchecked(withLazyRetriedMount(() -> {
			return cm.open(path, flag, 0);
		}));

		try {
			return sm.openProxyFileDescriptor(
				fdmode, new CephFSProxyFileDescriptorCallback(cm, fd), ioHandler);
		} catch (IOException e) {
			throw new UncheckedIOException(e);
		}
		return executor.executeWithUnchecked(cm -> {
			int fd = cm.open(path, flag, 0);
			return sm.openProxyFileDescriptor(fdmode,
				new CephFSProxyFileDescriptorCallback(executor, cm, fd, path, flag),
				ioHandler);
		});
	}

	private String getXDGThumbnailFile(String name) {


@@ 261,14 245,14 @@ public class CephFSDocumentsProvider extends DocumentsProvider {
			throws FileNotFoundException {
		// TODO consider EXTRA_ERROR?
		CephStat cs = new CephStat();
		CephFSOperations.translateToUnchecked(withLazyRetriedMount(() -> {
		executor.executeWithUnchecked(cm -> {
			try {
				cm.lstat(dir + displayName, cs);
				return null;
			} catch (CephNotDirectoryException e) {
				throw new FileNotFoundException(e.getMessage());
			}
		}));
		});
		MatrixCursor.RowBuilder row = result.newRow();
		row.add(Document.COLUMN_DOCUMENT_ID, documentId);
		row.add(Document.COLUMN_DISPLAY_NAME, displayName);


@@ 276,7 260,7 @@ public class CephFSDocumentsProvider extends DocumentsProvider {
		row.add(Document.COLUMN_LAST_MODIFIED, cs.m_time);

		if (cs.isSymlink()) {
			CephFSOperations.translateToUnchecked(withLazyRetriedMount(() -> {
			executor.executeWithUnchecked(cm -> {
				try {
					cm.stat(dir + displayName, cs);
					return null;


@@ 284,7 268,7 @@ public class CephFSDocumentsProvider extends DocumentsProvider {
					Log.e(APP_NAME, "stat: " + dir + displayName + " not found", e);
					return null;
				}
			}));
			});
		}
		String mimeType = getMime(cs.mode, displayName);
		row.add(Document.COLUMN_MIME_TYPE, mimeType);


@@ 319,14 303,14 @@ public class CephFSDocumentsProvider extends DocumentsProvider {
				}
			} else {
				String thubmailPath = dir + ".sh_thumbnails/normal/" + getXDGThumbnailFile(displayName);
				thumbnailFound = CephFSOperations.translateToUnchecked(withLazyRetriedMount(() -> {
				thumbnailFound = executor.executeWithUnchecked(cm -> {
					try {
						cm.stat(thubmailPath, cs);
						return true;
					} catch (FileNotFoundException|CephNotDirectoryException e) {
						return false;
					}
				}));
				});
			}

			if (thumbnailFound) {


@@ 342,22 326,22 @@ public class CephFSDocumentsProvider extends DocumentsProvider {
		var path = Uri.parse(parentDocumentId).getPath();
		MatrixCursor result = new MatrixCursor(projection != null ? projection : DEFAULT_DOC_PROJECTION);
		Log.v(APP_NAME, "queryChildDocuments " + parentDocumentId);
		String[] res = CephFSOperations.translateToCursorExtra(withLazyRetriedMount(() -> {
		String[] res = executor.executeWithCursorExtra(cm -> {
			return cm.listdir(path);
		}), result);
		}, result);
		if (res == null) {
			return result;
		}

		// TODO make this not fatal instead?
		String[] thumbnails = CephFSOperations.translateToCursorExtra(withLazyRetriedMount(() -> {
		String[] thumbnails = executor.executeWithCursorExtra(cm -> {
			try {
				return cm.listdir(path + "/.sh_thumbnails/normal");
			} catch (FileNotFoundException e) {
				return new String[0];
			}
		}), result);
		if (res == null) {
		}, result);
		if (thumbnails == null) {
			return result;
		}



@@ 416,25 400,20 @@ public class CephFSDocumentsProvider extends DocumentsProvider {
	public Cursor queryRoots(String[] projection)
			throws FileNotFoundException {
		MatrixCursor result = new MatrixCursor(projection != null ? projection : DEFAULT_ROOT_PROJECTION);
		if (cm != null) {
			cm.unmount();
			cm = null;
		}
		CephStatVFS csvfs = new CephStatVFS();
		CephFSOperations.translateToUnchecked(withLazyRetriedMount(() -> {
		executor.executeWithUnchecked(cm -> {
			cm.statfs(".", csvfs);
			return null;
		}));
		});

		var builder = new Uri.Builder();
		var rootUri = builder.scheme("cephfs").authority(id + "@" + mon).build();
		var rootUri = executor.config.getRootUri();
		MatrixCursor.RowBuilder row = result.newRow();
		row.add(Root.COLUMN_ROOT_ID, rootUri);
		row.add(Root.COLUMN_DOCUMENT_ID, rootUri);
		row.add(Root.COLUMN_FLAGS, Root.FLAG_SUPPORTS_CREATE | Root.FLAG_SUPPORTS_IS_CHILD);
		row.add(Root.COLUMN_TITLE, mon + ":" + path);
		row.add(Root.COLUMN_TITLE, executor.config.getTitle());
		row.add(Root.COLUMN_ICON, R.mipmap.sym_def_app_icon);
		row.add(Root.COLUMN_SUMMARY, "CephFS with user: " + id);
		row.add(Root.COLUMN_SUMMARY, executor.config.getSummary());
		row.add(Root.COLUMN_CAPACITY_BYTES, csvfs.blocks * csvfs.frsize);
		row.add(Root.COLUMN_AVAILABLE_BYTES, csvfs.bavail * csvfs.frsize);
		return result;

R src/main/java/org/safcephfs/CephFSOperations.java => src/main/java/org/safcephfs/CephFSExecutor.java +62 -24
@@ 1,6 1,7 @@
package org.safcephfs;

import android.database.Cursor;
import android.net.Uri;
import android.os.Bundle;
import android.provider.DocumentsContract;
import android.system.ErrnoException;


@@ 8,12 9,47 @@ import android.system.OsConstants;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Map;

import com.ceph.fs.CephMount;

public class CephFSExecutor {
	protected record CephMountConfig(
			String id, String path, Map<String, String> config) {

		protected String getRootUri() {
			var builder = new Uri.Builder();
			var uri = builder.scheme("cephfs").authority(id + "@" + config.get("mon_host")).build();
			return uri.toString();
		}

		protected String getTitle() {
			return config.get("mon_host") + ":" + path;
		}

		protected String getSummary() {
			return "CephFS with user: " + id;
		}
	}

	protected CephMountConfig config;
	private CephMount cm;

	protected CephFSExecutor(CephMountConfig config) {
		this.config = config;
	}

public class CephFSOperations {
	protected interface Operation<T> {
		T execute() throws IOException;
		T execute(CephMount cm) throws IOException;
	}

	protected Operation<CephMount> mount = unused -> {
		CephMount m = new CephMount(config.id);
		config.config.forEach((k, v) -> m.conf_set(k, v));
		m.mount(config.path);
		return m;
	};

	/*
	 * libcephfs_jni throws IOException with message from strerror()
	 * Bionic strerror:


@@ 105,18 141,36 @@ public class CephFSOperations {
		};
	}

	protected static <T> T translateToErrnoException(
	protected <T> T execute(Operation<T> op) throws IOException {
		if (cm == null) {
			cm = this.mount.execute(null);
		}
		try {
			return op.execute(cm);
		} catch (IOException e) {
			// ESHUTDOWN
			if (e.getMessage().equals("Cannot send after transport endpoint shutdown")) {
				cm.unmount();
				cm = this.mount.execute(null);
				return op.execute(cm);
			} else {
				throw e;
			}
		}
	}

	protected <T> T executeWithErrnoException(
			String functionName, Operation<T> op) throws ErrnoException {
		try {
			return op.execute();
			return execute(op);
		} catch (IOException e) {
			throw new ErrnoException(functionName, cephIOEToOsConstants(e));
		}
	}

	protected static <T> T translateToCursorExtra(Operation<T> op, Cursor c) {
	protected <T> T executeWithCursorExtra(Operation<T> op, Cursor c) {
		try {
			return op.execute();
			return execute(op);
		} catch (IOException e) {
			var extra = new Bundle();
			extra.putString(DocumentsContract.EXTRA_ERROR, e.getMessage());


@@ 125,28 179,12 @@ public class CephFSOperations {
		}
	}

	protected static <T> T translateToUnchecked(Operation<T> op) {
	protected <T> T executeWithUnchecked(Operation<T> op) {
		try {
			return op.execute();
			return execute(op);
		// TODO preserve IOE subclasses
		} catch (IOException e) {
			throw new UncheckedIOException(e);
		}
	}

	protected static <T> Operation<T> retryOnESHUTDOWN(
			Operation<Object> setup, Operation<T> op) {
		return () -> {
			try {
				return op.execute();
			} catch (IOException e) {
				if (e.getMessage().equals("Cannot send after transport endpoint shutdown")) {
					setup.execute();
					return op.execute();
				} else {
					throw e;
				}
			}
		};
	}
}

M src/main/java/org/safcephfs/CephFSProxyFileDescriptorCallback.java => src/main/java/org/safcephfs/CephFSProxyFileDescriptorCallback.java +30 -10
@@ 10,18 10,38 @@ import com.ceph.fs.CephMount;
import com.ceph.fs.CephStat;

public class CephFSProxyFileDescriptorCallback extends ProxyFileDescriptorCallback {
	private CephFSExecutor executor;
	private CephMount cm;
	private int fd;
	private String path;
	private int fd, mode;

	public CephFSProxyFileDescriptorCallback(CephMount cm, int fd) {
	public CephFSProxyFileDescriptorCallback(
			CephFSExecutor executor, CephMount cm, int fd,
			String path, int mode) {
		this.cm = cm;
		this.fd = fd;
		this.executor = executor;
		this.path = path;
		this.mode = mode;
	}

	private <T> CephFSExecutor.Operation<T> reopenIfNeeded(
			CephFSExecutor.Operation<T> op) {
		return cm -> {
			if (cm != this.cm) {
				fd = cm.open(path, mode, 0);
				this.cm = cm;
			};
			return op.execute(cm);
		};
	};

	@Override
	public void onFsync() throws ErrnoException {
		CephFSOperations.translateToErrnoException("fsync", () -> {
			cm.fsync(fd, false);
		executor.executeWithErrnoException("fsync", cm -> {
			if (cm == this.cm) {
				cm.fsync(fd, false);
			}
			return null;
		});
	}


@@ 29,19 49,19 @@ public class CephFSProxyFileDescriptorCallback extends ProxyFileDescriptorCallba
	@Override
	public long onGetSize() throws ErrnoException {
		CephStat cs = new CephStat();
		CephFSOperations.translateToErrnoException("fstat", () -> {
		executor.executeWithErrnoException("fstat", reopenIfNeeded(cm -> {
			cm.fstat(fd, cs);
			return null;
		});
		}));
		return cs.size;
	}

	@Override
	public int onRead(long offset, int size, byte[] data)
		throws ErrnoException {
		return CephFSOperations.translateToErrnoException("read", () -> {
		return executor.executeWithErrnoException("read", reopenIfNeeded(cm -> {
			return cm.read(fd, data, size, offset);
		}).intValue();
		})).intValue();
	}

	@Override


@@ 52,8 72,8 @@ public class CephFSProxyFileDescriptorCallback extends ProxyFileDescriptorCallba
	@Override
	public int onWrite(long offset, int size, byte[] data)
		throws ErrnoException {
		return CephFSOperations.translateToErrnoException("write", () -> {
		return executor.executeWithErrnoException("write", reopenIfNeeded(cm -> {
			return cm.write(fd, data, size, offset);
		}).intValue();
		})).intValue();
	}
}