sway/fuzzel: fix nested dirs

This commit is contained in:
Josh Lay 2024-06-19 07:21:49 -05:00
parent 32c196e628
commit 0336b62c26
No known key found for this signature in database
GPG key ID: 47AA304B2243B579
23 changed files with 0 additions and 0 deletions

View file

@ -0,0 +1,42 @@
#!/usr/bin/env python3
'''simple screen locker for Sway, run by $mod+l'''
from os import environ
from i3ipc import Connection
def lock_session(manager):
'''function for the lock sequence
pauses any playing media and runs swaylock to lock the session'''
lock_commands = ['playerctl pause',
'swaylock -f']
for command in lock_commands:
print(f'Executing: {command}')
manager.command(f'exec {command}')
try:
# explicitly tied to sway/swaysock
# don't get the impression swaylock works w/ i3
# ...which this module also supports/would use naively
SWAYSOCK = environ['SWAYSOCK']
# use subprocess/xrandr to get the 4k display to (later) make it primary
# otherwise games seem to get confused on monitor/resolution
# with the socket, connect and lock
_wm = Connection(socket_path=SWAYSOCK, auto_reconnect=True)
lock_session(_wm)
# clean up, disconnect from WM
_wm.main_quit()
except IOError as e:
# Handle exceptions related to the connection
print("There was a problem establishing the connection, socket:", SWAYSOCK)
print(e)
except KeyError:
print('The "SWAYSOCK" var is not defined')

View file

@ -0,0 +1,164 @@
#!/usr/bin/env python3
"""
This script provides screenshot handling for Sway
Determines the mode (region or window), and then takes a capture
Expects window manager keybinds like below:
bindsym --no-repeat Print exec "~/.config/sway/scripts/screenshot.py region"
bindsym --no-repeat Shift+Print exec "~/.config/sway/scripts/screenshot.py window"
If mode is 'region', the tool 'slurp' is used to select the region to capture.
If mode is 'window', the WM is asked which display is active (to capture).
In both cases, the 'grim' utility does the image capture. Captures go here:
~/Pictures/screenshots/Screenshot_*.png
Fedora dependencies:
- sudo dnf in python-{i3ipc,pillow}
Note: while i3ipc aspects of this will work with i3...
slurp/grim likely will not
"""
import argparse
import os
import subprocess
from time import strftime
from i3ipc import Connection, Event
from PIL import Image
from PIL.PngImagePlugin import PngInfo
# setup argparse
# take one arg, m/mode: selection/window
parser = argparse.ArgumentParser(
description='Take some screenshots for Sway using IPC and slurp/grim')
# add main arg, screenshot mode -- region (selection area) or [focused] window
parser.add_argument('mode', type=str, choices=['region', 'window', 'w', 'r'],
help='''Screenshot "mode", either:
A selected area ('region') or
the focused display ('window')"''')
# instantiate args
args = parser.parse_args()
# env prep
# get the current time
now = strftime("%Y-%m-%dT-%H%M%z")
# use strftime - similar iso format as 'datetime', with 1 minor fix
# no ':' - in the off chance they are uploaded to Jira
# ex: 2022-11-21T-2337-0600
# set a var for the homedir using os.environ - arguably not portable? /shrug
homedir = os.environ['HOME']
screenshot_dir = f'{homedir}/Pictures/screenshots'
screenshot_path = f'{screenshot_dir}/Screenshot_{now}.png'
preview_command = f"imv -d -s none '{screenshot_path}'"
# preview_command = f"xdg-open '{screenshot_path}'"
if not os.path.isdir(screenshot_dir):
print(f"Screenshot dir doesn't exist, creating {screenshot_dir}")
os.mkdir(screenshot_dir)
else:
print(f'Screenshot dir ({screenshot_dir}) exists, continuing')
# misc functions
def determine_ss_cmd():
'''based on mode, determine the screenshot command'''
# screenshot command handling (based on mode)
# grim uses compression level 1 in both cases
# neglible quality difference while saving space
if args.mode in ['window', 'w']:
# use wm connection to determine the active output
outputs = _wm.get_outputs()
for output in outputs:
if output.focused:
active_output = output.name
print(f'determined active output: {active_output}')
command = f"grim -l 1 -c -o {active_output} '{screenshot_path}'"
elif args.mode in ['region', 'r']:
# omits -c to leave out cursors
command = f"slurp -d | grim -l 1 -g - '{screenshot_path}'"
return command
def preview_focus(_wm, _event):
'''function called by window manager new_window events
checks if new window is preview, if so: give it focus'''
if _event.container.app_id == 'imv':
# give the preview focus
# for Sway we use 'app_id', for i3 this is probably 'class'
_wm.command('[app_id=imv] focus')
# once the preview window is focused, close our connection to the wm
_wm.main_quit()
def wm_connect():
'''get the party started, create a connection to the window manager'''
conn = Connection(auto_reconnect=True)
# on new window events check if screenshot preview window gets focus
conn.on(Event.WINDOW_NEW, preview_focus)
return conn
def _run_command(command):
print(f'Command: {command}')
_r = subprocess.run(command, shell=True, capture_output=True, check=False)
if _r.stderr:
raise subprocess.CalledProcessError(
returncode = _r.returncode,
cmd = _r.args,
stderr = _r.stderr
)
if _r.stdout:
print(f"Command Result: {_r.stdout.decode('utf-8')}")
return _r
# begin screenshot/preview process
# connect to the window manager -- Sway
# (i3 could work, may need grim/slurp replacements)
_wm = wm_connect()
# determine screenshot command - differs if window or region mode
screenshot_command = determine_ss_cmd()
# run the screenshot/preview commands
# previewing/sending focus only if screenshot is taken
SS_RC = -1
try:
ss_result = _run_command(screenshot_command)
SS_RC = ss_result.returncode
except subprocess.CalledProcessError as error:
print('screenshot failed/aborted')
# clean up after ourselves, close the wm loop
_wm.main_quit()
# if the screenshot succeeded, place a comment on the image
# and then preview/focus the window
if SS_RC == 0:
#
# construct the comment for the screenshot
# immediately after it's taken, find the focused window details
wm_tree = _wm.get_tree()
wm_focused = wm_tree.find_focused()
app_name = wm_focused.name
app_id = wm_focused.app_id
COMMENT = f"Screenshot of '{app_name} (app_id: {app_id}) at {now}'"
print(f'storing comment: {COMMENT}')
#
# open the screenshot for (metadata) modification
# adding the application title/window class/date as a comment
# visible using 'exiftool', easier sorting through command line
ss_obj = Image.open(screenshot_path)
ss_metadata = PngInfo()
ss_metadata.add_text("Comment", COMMENT)
#
# write the (commented) image back out
ss_obj.save(screenshot_path, pnginfo=ss_metadata)
#
# open the preview with 'imv'
print(f"exec preview: {preview_command}")
_wm.command(f'exec {preview_command}')
#
# start the main loop for the window manager
# basically wait for the preview listener to fire
# when the preview window opens, a message is sent to give it focus
# afterwards we exit
_wm.main()

View file

@ -0,0 +1,203 @@
#!/usr/bin/env python3
# pylint: disable=line-too-long
"""
A smart but also lazy login autostart manager for i3/Sway.
Will conditionally exec other things defined in a YML dict. ie: every day, work days, or weekends
Required i3/Sway config line:
exec /home/jlay/.config/sway/scripts/startup.py
Config sample:
---
autostarts:
pre: [] # blocking tasks that run every day, before any other section. intended for backups/updates
common: [] # non-blocking tasks that run every day
weekend: [] # blocking tasks for weekends, after 'pre' but before 'common'
work: [] # non-blocking tasks run if Monday through Friday between 8AM - 4PM
Dependencies: python3-i3ipc
"""
import os
import subprocess
from datetime import datetime
from time import sleep
import argparse
from textwrap import dedent
from systemd import journal
import yaml.loader
from i3ipc import Connection
from xdg import BaseDirectory
def log_message(
message: str, level: str, syslog_identifier: str = "sway-autostart"
) -> None:
"""Given `message`, send it to the journal and print
Accepts 'journal' levels. ie: `journal.LOG_{ERR,INFO,CRIT,EMERG}'
"""
valid_levels = {
journal.LOG_EMERG,
journal.LOG_ALERT,
journal.LOG_CRIT,
journal.LOG_ERR,
journal.LOG_WARNING,
journal.LOG_NOTICE,
journal.LOG_INFO,
journal.LOG_DEBUG,
}
if level not in valid_levels:
raise ValueError(f"Invalid log level: {level}")
print(message)
journal.send(message, PRIORITY=level, SYSLOG_IDENTIFIER=syslog_identifier)
def parse_args():
"""If run interactively, this provides arg function to the user"""
description_text = dedent(
f"""\
A smart but also lazy login autostart manager for i3/Sway.
Will conditionally exec other things defined in a YML dict. ie: every day, work days, or weekends
Required i3/Sway config line:
exec {os.path.abspath(__file__)}
Config sample:
---
autostarts:
pre: [] # blocking tasks that run every day, before any other section. intended for backups/updates
common: [] # non-blocking tasks that run every day
weekend: [] # blocking tasks for weekends, after 'pre' but before 'common'
work: [] # non-blocking tasks run if Monday through Friday between 8AM - 4PM
"""
)
class PlainDefaultFormatter(
argparse.ArgumentDefaultsHelpFormatter, argparse.RawDescriptionHelpFormatter
):
"""Combines two ArgParse formatter classes:
- argparse.ArgumentDefaultsHelpFormatter
- argparse.RawDescriptionHelpFormatter"""
parser = argparse.ArgumentParser(
description=description_text, formatter_class=PlainDefaultFormatter
)
# Default path for the config
default_config = os.path.join(BaseDirectory.xdg_config_home, "autostart-i3ipc.yml")
parser.add_argument(
"-c",
"--config",
default=default_config,
help="Path to the YML configuration file.",
)
return parser.parse_args()
# OOPy way to determine if it's a work day -- mon<->friday, 3AM<->5PM
class WorkTime(datetime):
"""datetime but with work on the mind"""
def is_workday(self):
"""determine if it's a work day: monday-friday between 3AM and 5PM.
Use .vacation file to go on vacation"""
# first check if ~/.vacation exists - if so, not a work day
if os.path.isfile(os.path.expanduser("~/.vacation")):
return False
# note: last number in range isn't included
if not self.is_weekend() and self.hour in range(8, 16):
return True
return False
def is_weekend(self):
"""determine if it's the weekend or not, ISO week day outside 1-5"""
if self.isoweekday() not in range(1, 6):
return True
return False
if __name__ == "__main__":
args = parse_args()
config_path = args.config
# get the current time
now = WorkTime.now()
# determine if it's a work day using WorkTime above
workday = now.is_workday()
weekend = now.is_weekend()
# initialize empty lists for the different categories
wants = [] # non-blocking tasks from 'common' and 'workday' sections in config
pre_list = [] # blocking tasks before the rest
weekend_list = [] # non-blocking tasks for weekend days/logins
# check the config file for existence/structure. if found, extend the lists
if os.path.exists(config_path):
print(f"found/loading config: '{config_path}'")
with open(config_path, "r", encoding="utf-8") as _config:
config_file = yaml.load(_config, Loader=yaml.FullLoader)
try:
loaded_starts = config_file["autostarts"]
wants.extend(loaded_starts["common"])
if loaded_starts["pre"]:
pre_list.extend(loaded_starts["pre"])
if workday:
wants.extend(loaded_starts["work"])
if weekend:
weekend_list.extend(loaded_starts["weekend"])
except KeyError as key_err:
log_message(
f"Key not found in {config_path}: {key_err.args[0]}",
journal.LOG_ERR,
)
except NameError as name_err:
log_message(f"name error: {name_err}", journal.LOG_ERR)
# get the party started, create a connection to the window manager
_wm = Connection(auto_reconnect=True)
# start the blocking tasks - 'pre' and 'weekend'
# avoid sending them to the WM, would become backgrounded/async
for pre_item in pre_list:
try:
log_message(
f'running (blocking) "pre" task: "{pre_item}"', journal.LOG_INFO
)
subprocess.run(pre_item, shell=True, check=False)
except subprocess.CalledProcessError as pre_ex:
log_message(f'failed "{pre_item}": {pre_ex.output}', journal.LOG_ERR)
if weekend:
for weekend_item in weekend_list:
try:
log_message(
f'running (blocking) "weekend" task: "{weekend_item}"',
journal.LOG_INFO,
)
subprocess.check_output(weekend_item, shell=True)
except subprocess.CalledProcessError as weekend_except:
log_message(
f'Exception during "{weekend_item}": {weekend_except.output}',
journal.LOG_ERR,
)
# launch 'common' and 'work' tasks; not expected to block, sent to window manager
for wanteditem in wants:
command = "exec " + wanteditem
log_message(f'sending to WM: "{command}"', journal.LOG_INFO)
reply = _wm.command(command)
sleep(0.1)
if reply[0].error:
# note: this doesn't check return codes
# serves to check if there was a parsing/comm. error with the WM
log_message(
f'autostart "{command}" failed, couldn\'t reach WM', journal.LOG_ERR
)
_wm.main_quit()

View file

@ -0,0 +1,105 @@
#!/usr/bin/env python3
'''Manages PulseAudio volume for a sink (output) by percentage
Run by Sway on XF86Audio{Raise,Lower}Volume'''
import argparse
import pulsectl
from pydbus import SessionBus
def get_pulse():
'''Returns a Pulse object for inspection/modification'''
return pulsectl.Pulse('volume-changer')
def get_sink(pulse, sink_name=None):
'''Get / return a PulseAudio sink (output) object by name.
Used to query and set the volume. If `sink_name` is omitted, return *default*
Args:
pulse (Pulse): The Pulse object to query, see `get_pulse()`
sink_name (str, optional): The name of the PulseAudio sink to look for.
If omitted the default sink is assumed
Returns:
pulsectl.PulseSinkInfo'''
if sink_name is None:
return pulse.get_sink_by_name(pulse.server_info().default_sink_name)
return pulse.get_sink_by_name(sink_name)
def get_volume_rounded(pulse, sink):
'''Return the volume of the provided sink
Returned as a rounded int averaged across channels, assumed linked'''
return round(pulse.volume_get_all_chans(sink) * 100)
def set_volume(pulse, sink, adjustment):
'''Changes the PulseAudio `sink` volume by the given `adjustment`
`sink` should be a pactl object; see `get_sink`
`adjustment` should be a float, ie 0.01 for *raising* 1%
Invert (* -1) to lower'''
pulse.volume_change_all_chans(sink, adjustment)
# Create argument parser
parser = argparse.ArgumentParser(description='Change audio volume.')
parser.add_argument('direction', choices=['raise', 'lower'], help='The direction to change the volume.')
parser.add_argument('percentage', nargs='?', default=1, type=int, help='The percentage to change the volume.')
parser.add_argument('--sink', default=None, help='The PulseAudio sink (name) to manage.')
# Parse arguments
args = parser.parse_args()
# Calculate the volume change as a float, inverse it if *lowering*
# used as a multiplier
change = args.percentage / 100
if args.direction == 'lower':
change = change * -1
# construct empty dict for JSON output/data
# interesting info is appended later
data = {"sink": "",
"change": "",
"start_vol": "",
"new_vol": ""}
# connect to the notification bus
notifications = SessionBus().get('.Notifications')
# get pulse / connect
try:
with get_pulse() as _p:
# query the default sink
sink_def = get_sink(pulse=_p)
# get the starting vol
start_vol = get_volume_rounded(pulse=_p, sink=sink_def)
# change the volume
set_volume(pulse=_p, sink=sink_def, adjustment=change)
# query the volume again
new_volume = get_volume_rounded(pulse=_p, sink=sink_def)
# construct data dict for CLI output/reference
data['sink'] = sink_def.name
data['change'] = change
data['start_vol'] = start_vol
data['new_vol'] = new_volume
# Create a desktop notification
notification_id = notifications.Notify(
'volume-changer', 0, 'dialog-information',
'Volume Change',
f"Now {data['new_vol']}%, was {data['start_vol']}%",
[], {}, 1000)
except pulsectl.PulseError as e:
data['sink'] = None
data['change'] = 'Impossible, exception: {e}'
# notify that we couldn't work with pulseaudio/compatible daemon
notification_id = notifications.Notify(
'volume-changer', 0, 'dialog-error',
'Volume Change',
f"Exception: {e}",
[], {}, 1000)
print(data, flush=True)

View file

@ -0,0 +1,84 @@
#!/usr/bin/env python3
"""
wallpaper.py - random wallpaper/output utility for i3/Sway
usage: wallpaper.py [-h] [--select {common,unique}] directory
Selection modes:
Common: One wallpaper selected and shared for *all* displays
Unique: One wallpaper selected for *each* display
"""
import argparse
import os
import sys
import random
import asyncio
from typing import List
from i3ipc.aio import Connection
EXTS = ['.jpg', '.jpeg', '.png', '.bmp', '.gif']
def parse_args():
'''Handles argparse on startup'''
parser = argparse.ArgumentParser(description='Random Wallpaper Setter for Sway')
parser.add_argument('directory', type=str, help='Directory containing wallpapers')
parser.add_argument('--select',
type=str,
choices=['common', 'unique'],
default='common',
help='Wallpaper selection mode: all displays, or each?')
return parser.parse_args()
async def main():
'''you know what it is'''
args = parse_args()
sway = await Connection(auto_reconnect=True).connect()
def list_image_files(d: str) -> List[str]:
'''
Given the path to a *'directory'*, returns a list of image files to consider for wallpapers.
'''
return [os.path.join(d, f) for f in os.listdir(d) if os.path.splitext(f)[1].lower() in EXTS]
async def set_wallpaper(file_path: str, output=None):
'''
Given an image path, sets the wallpaper for (optional) outputs in i3/Sway.
If no output is specified then *all* will receive the wallpaper.
'''
print(f"{output if output else 'all'}: wallpaper='{file_path}'")
if output:
await sway.command(f'output "{output}" bg "{file_path}" fill')
else:
for _output in await sway.get_outputs():
await sway.command(f'output "{_output.name}" bg "{file_path}" fill')
if os.path.isdir(args.directory):
image_files = list_image_files(args.directory)
else:
sys.exit(f'ERR: not a directory: {args.directory}')
if not image_files:
print("No image files found in the specified directory.")
return
print(f'Found {len(image_files)} candidate image files')
outputs = await sway.get_outputs()
if args.select == 'common':
wallpaper = random.choice(image_files)
await set_wallpaper(wallpaper)
else: # args.select == 'unique', we need to determine a wallpaper for each display
for output in outputs:
if len(image_files) == 0:
print(f"Not enough images in '{args.directory}' for each display.")
break
wallpaper = random.choice(image_files)
image_files.remove(wallpaper)
await set_wallpaper(wallpaper, output.name)
if __name__ == "__main__":
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
pass