From bf4439c3ae04a7b54e81e9c30d1ed919218df049 Mon Sep 17 00:00:00 2001 From: Pinghao Wu Date: Fri, 25 Oct 2024 14:25:26 +0800 Subject: [PATCH] CephFSExecutor: impl mount mgmt, also bring to fd callback --- .../safcephfs/CephFSDocumentsProvider.java | 135 ++++++++---------- ...hFSOperations.java => CephFSExecutor.java} | 86 +++++++---- .../CephFSProxyFileDescriptorCallback.java | 40 ++++-- 3 files changed, 149 insertions(+), 112 deletions(-) rename src/main/java/org/safcephfs/{CephFSOperations.java => CephFSExecutor.java} (78%) diff --git a/src/main/java/org/safcephfs/CephFSDocumentsProvider.java b/src/main/java/org/safcephfs/CephFSDocumentsProvider.java index 46c0c45..fac2d4b 100644 --- a/src/main/java/org/safcephfs/CephFSDocumentsProvider.java +++ b/src/main/java/org/safcephfs/CephFSDocumentsProvider.java @@ -32,7 +32,7 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; -import java.util.Vector; +import java.util.HashMap; import java.util.Locale; import com.ceph.fs.CephMount; @@ -41,10 +41,9 @@ import com.ceph.fs.CephStatVFS; import com.ceph.fs.CephNotDirectoryException; public class CephFSDocumentsProvider extends DocumentsProvider { - private String id, mon, path, key; private StorageManager sm; private Handler ioHandler; - private CephMount cm = null; + private CephFSExecutor executor; private int uid; private ToastThread lthread; @@ -114,46 +113,6 @@ public class CephFSDocumentsProvider extends DocumentsProvider { lthread.handler.sendMessage(msg); } - private CephFSOperations.Operation setupMount = () -> { - SharedPreferences settings = PreferenceManager - .getDefaultSharedPreferences(getContext()); - mon = settings.getString("mon", ""); - key = settings.getString("key", ""); - id = settings.getString("id", ""); - path = settings.getString("path", ""); - String timeout = settings.getString("timeout", ""); - timeout = timeout.matches("\\d+") ? timeout : "20"; - CephMount newMount = new CephMount(id); - newMount.conf_set("mon_host", mon); - newMount.conf_set("key", key); - newMount.conf_set("client_mount_timeout", timeout); - newMount.conf_set("client_dirsize_rbytes", "false"); - checkPermissions = settings.getBoolean("permissions", true); - if (!checkPermissions) { - newMount.conf_set("client_permissions", "false"); - } - newMount.mount(path); // IOException if fails - return newMount; - }; - - // TODO refactor to a executor, and port to CephFSProxyFileDescriptorCallback - private CephFSOperations.Operation withLazyRetriedMount( - CephFSOperations.Operation op) { - return () -> { - if (cm == null) { - cm = setupMount.execute(); - } - return CephFSOperations.retryOnESHUTDOWN( - () -> { - cm.unmount(); - Log.e(APP_NAME, "Mount died, retrying"); - cm = setupMount.execute(); - return null; - }, - op).execute(); - }; - } - @Override public boolean onCreate() { APP_NAME = getContext().getString(R.string.app_name); @@ -165,6 +124,35 @@ public class CephFSDocumentsProvider extends DocumentsProvider { ioThread.start(); ioHandler = new Handler(ioThread.getLooper()); uid = Process.myUid(); + + SharedPreferences settings = PreferenceManager + .getDefaultSharedPreferences(getContext()); + SharedPreferences.OnSharedPreferenceChangeListener l = (sp, key) -> { + var id = sp.getString("id", ""); + var path = sp.getString("path", ""); + var timeout = sp.getString("timeout", ""); + timeout = timeout.matches("\\d+") ? timeout : "20"; + var checkPermissions = sp.getBoolean("permissions", true); + var config = new HashMap(); + + config.put("mon_host", sp.getString("mon", "")); + config.put("key", sp.getString("key", "")); + config.put("client_mount_timeout", timeout); + config.put("client_dirsize_rbytes", "false"); + if (!checkPermissions) { + config.put("client_permissions", "false"); + } + + config.put("debug_javaclient", "20"); + config.put("debug_ms", "1"); + config.put("debug_client", "10"); + config.put("ms_connection_ready_timeout", "3"); + + var c = new CephFSExecutor.CephMountConfig(id, path, config); + executor = new CephFSExecutor(c); + }; + settings.registerOnSharedPreferenceChangeListener(l); + l.onSharedPreferenceChanged(settings, ""); return true; } @@ -174,16 +162,16 @@ public class CephFSDocumentsProvider extends DocumentsProvider { var path = Uri.parse(parentDocumentId).getPath(); String filename = path + "/" + displayName; if (mimeType.equals(Document.MIME_TYPE_DIR)) { - CephFSOperations.translateToUnchecked(withLazyRetriedMount(() -> { + executor.executeWithUnchecked(cm -> { cm.mkdir(filename, 0700); return null; - })); + }); } else { - CephFSOperations.translateToUnchecked(withLazyRetriedMount(() -> { + executor.executeWithUnchecked(cm -> { int fd = cm.open(filename, CephMount.O_WRONLY | CephMount.O_CREAT | CephMount.O_EXCL, 0700); cm.close(fd); return null; - })); + }); } return parentDocumentId + "/" + filename; } @@ -217,16 +205,12 @@ public class CephFSDocumentsProvider extends DocumentsProvider { throw new UnsupportedOperationException("Mode " + mode + " not implemented"); } - int fd = CephFSOperations.translateToUnchecked(withLazyRetriedMount(() -> { - return cm.open(path, flag, 0); - })); - - try { - return sm.openProxyFileDescriptor( - fdmode, new CephFSProxyFileDescriptorCallback(cm, fd), ioHandler); - } catch (IOException e) { - throw new UncheckedIOException(e); - } + return executor.executeWithUnchecked(cm -> { + int fd = cm.open(path, flag, 0); + return sm.openProxyFileDescriptor(fdmode, + new CephFSProxyFileDescriptorCallback(executor, cm, fd, path, flag), + ioHandler); + }); } private String getXDGThumbnailFile(String name) { @@ -261,14 +245,14 @@ public class CephFSDocumentsProvider extends DocumentsProvider { throws FileNotFoundException { // TODO consider EXTRA_ERROR? CephStat cs = new CephStat(); - CephFSOperations.translateToUnchecked(withLazyRetriedMount(() -> { + executor.executeWithUnchecked(cm -> { try { cm.lstat(dir + displayName, cs); return null; } catch (CephNotDirectoryException e) { throw new FileNotFoundException(e.getMessage()); } - })); + }); MatrixCursor.RowBuilder row = result.newRow(); row.add(Document.COLUMN_DOCUMENT_ID, documentId); row.add(Document.COLUMN_DISPLAY_NAME, displayName); @@ -276,7 +260,7 @@ public class CephFSDocumentsProvider extends DocumentsProvider { row.add(Document.COLUMN_LAST_MODIFIED, cs.m_time); if (cs.isSymlink()) { - CephFSOperations.translateToUnchecked(withLazyRetriedMount(() -> { + executor.executeWithUnchecked(cm -> { try { cm.stat(dir + displayName, cs); return null; @@ -284,7 +268,7 @@ public class CephFSDocumentsProvider extends DocumentsProvider { Log.e(APP_NAME, "stat: " + dir + displayName + " not found", e); return null; } - })); + }); } String mimeType = getMime(cs.mode, displayName); row.add(Document.COLUMN_MIME_TYPE, mimeType); @@ -319,14 +303,14 @@ public class CephFSDocumentsProvider extends DocumentsProvider { } } else { String thubmailPath = dir + ".sh_thumbnails/normal/" + getXDGThumbnailFile(displayName); - thumbnailFound = CephFSOperations.translateToUnchecked(withLazyRetriedMount(() -> { + thumbnailFound = executor.executeWithUnchecked(cm -> { try { cm.stat(thubmailPath, cs); return true; } catch (FileNotFoundException|CephNotDirectoryException e) { return false; } - })); + }); } if (thumbnailFound) { @@ -342,22 +326,22 @@ public class CephFSDocumentsProvider extends DocumentsProvider { var path = Uri.parse(parentDocumentId).getPath(); MatrixCursor result = new MatrixCursor(projection != null ? projection : DEFAULT_DOC_PROJECTION); Log.v(APP_NAME, "queryChildDocuments " + parentDocumentId); - String[] res = CephFSOperations.translateToCursorExtra(withLazyRetriedMount(() -> { + String[] res = executor.executeWithCursorExtra(cm -> { return cm.listdir(path); - }), result); + }, result); if (res == null) { return result; } // TODO make this not fatal instead? - String[] thumbnails = CephFSOperations.translateToCursorExtra(withLazyRetriedMount(() -> { + String[] thumbnails = executor.executeWithCursorExtra(cm -> { try { return cm.listdir(path + "/.sh_thumbnails/normal"); } catch (FileNotFoundException e) { return new String[0]; } - }), result); - if (res == null) { + }, result); + if (thumbnails == null) { return result; } @@ -416,25 +400,20 @@ public class CephFSDocumentsProvider extends DocumentsProvider { public Cursor queryRoots(String[] projection) throws FileNotFoundException { MatrixCursor result = new MatrixCursor(projection != null ? projection : DEFAULT_ROOT_PROJECTION); - if (cm != null) { - cm.unmount(); - cm = null; - } CephStatVFS csvfs = new CephStatVFS(); - CephFSOperations.translateToUnchecked(withLazyRetriedMount(() -> { + executor.executeWithUnchecked(cm -> { cm.statfs(".", csvfs); return null; - })); + }); - var builder = new Uri.Builder(); - var rootUri = builder.scheme("cephfs").authority(id + "@" + mon).build(); + var rootUri = executor.config.getRootUri(); MatrixCursor.RowBuilder row = result.newRow(); row.add(Root.COLUMN_ROOT_ID, rootUri); row.add(Root.COLUMN_DOCUMENT_ID, rootUri); row.add(Root.COLUMN_FLAGS, Root.FLAG_SUPPORTS_CREATE | Root.FLAG_SUPPORTS_IS_CHILD); - row.add(Root.COLUMN_TITLE, mon + ":" + path); + row.add(Root.COLUMN_TITLE, executor.config.getTitle()); row.add(Root.COLUMN_ICON, R.mipmap.sym_def_app_icon); - row.add(Root.COLUMN_SUMMARY, "CephFS with user: " + id); + row.add(Root.COLUMN_SUMMARY, executor.config.getSummary()); row.add(Root.COLUMN_CAPACITY_BYTES, csvfs.blocks * csvfs.frsize); row.add(Root.COLUMN_AVAILABLE_BYTES, csvfs.bavail * csvfs.frsize); return result; diff --git a/src/main/java/org/safcephfs/CephFSOperations.java b/src/main/java/org/safcephfs/CephFSExecutor.java similarity index 78% rename from src/main/java/org/safcephfs/CephFSOperations.java rename to src/main/java/org/safcephfs/CephFSExecutor.java index cbfe784..db22b30 100644 --- a/src/main/java/org/safcephfs/CephFSOperations.java +++ b/src/main/java/org/safcephfs/CephFSExecutor.java @@ -1,6 +1,7 @@ package org.safcephfs; import android.database.Cursor; +import android.net.Uri; import android.os.Bundle; import android.provider.DocumentsContract; import android.system.ErrnoException; @@ -8,12 +9,47 @@ import android.system.OsConstants; import java.io.IOException; import java.io.UncheckedIOException; +import java.util.Map; + +import com.ceph.fs.CephMount; + +public class CephFSExecutor { + protected record CephMountConfig( + String id, String path, Map config) { + + protected String getRootUri() { + var builder = new Uri.Builder(); + var uri = builder.scheme("cephfs").authority(id + "@" + config.get("mon_host")).build(); + return uri.toString(); + } + + protected String getTitle() { + return config.get("mon_host") + ":" + path; + } + + protected String getSummary() { + return "CephFS with user: " + id; + } + } + + protected CephMountConfig config; + private CephMount cm; + + protected CephFSExecutor(CephMountConfig config) { + this.config = config; + } -public class CephFSOperations { protected interface Operation { - T execute() throws IOException; + T execute(CephMount cm) throws IOException; } + protected Operation mount = unused -> { + CephMount m = new CephMount(config.id); + config.config.forEach((k, v) -> m.conf_set(k, v)); + m.mount(config.path); + return m; + }; + /* * libcephfs_jni throws IOException with message from strerror() * Bionic strerror: @@ -105,18 +141,36 @@ public class CephFSOperations { }; } - protected static T translateToErrnoException( + protected T execute(Operation op) throws IOException { + if (cm == null) { + cm = this.mount.execute(null); + } + try { + return op.execute(cm); + } catch (IOException e) { + // ESHUTDOWN + if (e.getMessage().equals("Cannot send after transport endpoint shutdown")) { + cm.unmount(); + cm = this.mount.execute(null); + return op.execute(cm); + } else { + throw e; + } + } + } + + protected T executeWithErrnoException( String functionName, Operation op) throws ErrnoException { try { - return op.execute(); + return execute(op); } catch (IOException e) { throw new ErrnoException(functionName, cephIOEToOsConstants(e)); } } - protected static T translateToCursorExtra(Operation op, Cursor c) { + protected T executeWithCursorExtra(Operation op, Cursor c) { try { - return op.execute(); + return execute(op); } catch (IOException e) { var extra = new Bundle(); extra.putString(DocumentsContract.EXTRA_ERROR, e.getMessage()); @@ -125,28 +179,12 @@ public class CephFSOperations { } } - protected static T translateToUnchecked(Operation op) { + protected T executeWithUnchecked(Operation op) { try { - return op.execute(); + return execute(op); // TODO preserve IOE subclasses } catch (IOException e) { throw new UncheckedIOException(e); } } - - protected static Operation retryOnESHUTDOWN( - Operation setup, Operation op) { - return () -> { - try { - return op.execute(); - } catch (IOException e) { - if (e.getMessage().equals("Cannot send after transport endpoint shutdown")) { - setup.execute(); - return op.execute(); - } else { - throw e; - } - } - }; - } } diff --git a/src/main/java/org/safcephfs/CephFSProxyFileDescriptorCallback.java b/src/main/java/org/safcephfs/CephFSProxyFileDescriptorCallback.java index 6bab877..7337ddd 100644 --- a/src/main/java/org/safcephfs/CephFSProxyFileDescriptorCallback.java +++ b/src/main/java/org/safcephfs/CephFSProxyFileDescriptorCallback.java @@ -10,18 +10,38 @@ import com.ceph.fs.CephMount; import com.ceph.fs.CephStat; public class CephFSProxyFileDescriptorCallback extends ProxyFileDescriptorCallback { + private CephFSExecutor executor; private CephMount cm; - private int fd; + private String path; + private int fd, mode; - public CephFSProxyFileDescriptorCallback(CephMount cm, int fd) { + public CephFSProxyFileDescriptorCallback( + CephFSExecutor executor, CephMount cm, int fd, + String path, int mode) { this.cm = cm; this.fd = fd; + this.executor = executor; + this.path = path; + this.mode = mode; } + private CephFSExecutor.Operation reopenIfNeeded( + CephFSExecutor.Operation op) { + return cm -> { + if (cm != this.cm) { + fd = cm.open(path, mode, 0); + this.cm = cm; + }; + return op.execute(cm); + }; + }; + @Override public void onFsync() throws ErrnoException { - CephFSOperations.translateToErrnoException("fsync", () -> { - cm.fsync(fd, false); + executor.executeWithErrnoException("fsync", cm -> { + if (cm == this.cm) { + cm.fsync(fd, false); + } return null; }); } @@ -29,19 +49,19 @@ public class CephFSProxyFileDescriptorCallback extends ProxyFileDescriptorCallba @Override public long onGetSize() throws ErrnoException { CephStat cs = new CephStat(); - CephFSOperations.translateToErrnoException("fstat", () -> { + executor.executeWithErrnoException("fstat", reopenIfNeeded(cm -> { cm.fstat(fd, cs); return null; - }); + })); return cs.size; } @Override public int onRead(long offset, int size, byte[] data) throws ErrnoException { - return CephFSOperations.translateToErrnoException("read", () -> { + return executor.executeWithErrnoException("read", reopenIfNeeded(cm -> { return cm.read(fd, data, size, offset); - }).intValue(); + })).intValue(); } @Override @@ -52,8 +72,8 @@ public class CephFSProxyFileDescriptorCallback extends ProxyFileDescriptorCallba @Override public int onWrite(long offset, int size, byte[] data) throws ErrnoException { - return CephFSOperations.translateToErrnoException("write", () -> { + return executor.executeWithErrnoException("write", reopenIfNeeded(cm -> { return cm.write(fd, data, size, offset); - }).intValue(); + })).intValue(); } } -- 2.45.2