[WS] Implement basic workspace system

* get_started.py sets up a new workspace in a project folder
* build.py builds the project using info from Tupfile.lua
* run.py moves compiled program into kolibri.img and boots qemu on it

Only supports very simple programs for now (tested on few demos).
Only tested on Windows for now.

git-svn-id: svn://kolibrios.org@9357 a494cfbc-eb01-0410-851d-a64ba20cac60
This commit is contained in:
Magomed Kostoev (mkostoevr) 2021-12-02 20:01:38 +00:00
parent 41c5451c55
commit 6d89432b67
5 changed files with 1106 additions and 0 deletions

34
_tools/get_started.py Normal file
View File

@ -0,0 +1,34 @@
#!/usr/bin/python3
# Copyright Magomed Kostoev
# Published under MIT license
import os
def log(s, end = "\n"):
print(s, end = end, flush = True)
def install_python_script(src, dst, tools_lib):
log(f"Copying {src}... ", end = "")
with open(src) as src_file:
script = src_file.read()
tools_lib_escaped = tools_lib.replace("\\", "\\\\")
repl_from = "path_to_lib = '../lib'"
repl_to = f"path_to_lib ='{tools_lib_escaped}'"
script = script.replace(repl_from, repl_to, 1)
with open(dst, "w") as dst_file:
dst_file.write(script)
log(f"Done")
if __name__ == "__main__":
tools_get_started_py = os.path.abspath(__file__)
tools = os.sep.join(tools_get_started_py.split(os.sep)[:-1])
tools_lib = os.sep.join([tools, "lib"])
tools_workspace = os.sep.join([tools, "workspace"])
# Copy scripts from _tools/workspace to current folder, but let them know
# where the _tools/lib is (change their value of tools_lib variable)
tools_workspace_run_py = os.sep.join([tools_workspace, "run.py"])
tools_workspace_build_py = os.sep.join([tools_workspace, "build.py"])
install_python_script(tools_workspace_run_py, "run.py", tools_lib)
install_python_script(tools_workspace_build_py, "build.py", tools_lib)

792
_tools/lib/makeflop.py Normal file
View File

@ -0,0 +1,792 @@
#!/usr/bin/env python3
#
# makeflop.py
# Version 3
# Brad Smith, 2019
# http://rainwarrior.ca
#
# Simple file operations for a FAT12 floppy disk image.
# Public domain.
import sys
assert sys.version_info[0] == 3, "Python 3 required."
import struct
import datetime
import os
class Floppy:
"""
Simple file operations for a FAT12 floppy disk image.
Floppy() - creates a blank DOS formatted 1.44MB disk.
Floppy(data) - parses a disk from a given array of bytes.
.open(filename) - loads a file and returns a Floppy from its data.
.save(filename) - saves the disk image to a file.
.flush() - Updates the .data member with any pending changes.
.data - A bytearray of the disk image.
.files() - Returns a list of strings, each is a file or directory. Directories end with a backslash.
.delete_path(path) - Deletes a file or directory (recursive).
.add_dir_path(path) - Creates a new empty directory, if it does not already exist (recursive). Returns cluster of directory, or -1 if failed.
.add_file_path(path,data) - Creates a new file (creating directory if needed) with the given data. Returns False if failed.
.extract_file_path(path) - Returns a bytearray of the file at the given path, None if not found.
.set_volume_id(id=None) - Sets the 32-bit volume ID. Use with no arguments to generate ID from current time.
.set_volume_label(label) - Sets the 11-character volume label.
.boot_info() - Returns a string displaying boot sector information.
.fat_info() - Returns a very long string of all 12-bit FAT entries.
.files_info() - Returns a string displaying the files() list.
.add_all(path,prefix) - Adds all files from local path to disk (uppercased). Use prefix to specify a target directory. Returns False if any failed.
.extract_all(path) - Dumps entire contents of disk to local path.
.find_path(path) - Returns a Floppy.FileEntry.
FileEntry.info() - Returns and information string about a FileEntry.
This class provides some basic interface to a FAT12 floppy disk image.
Some things are fragile, in particular filenames that are too long,
or references to clusters outside the disk may cause exceptions.
The FAT can be accessed directly with some internal functions (see implementation)
and changes will be applied to the stored .data image with .flush().
Example:
f = Floppy.open("disk.img")
print(f.boot_info()) # list boot information about disk
print(f.file_info()) # list files and directories
f.extract_all("disk_dump\\")
f.delete_path("DIR\\FILE.TXT") # delete a file
f.delete_path("DIR") # delete an entire directory
f.add_file_path("DIR\\NEW.TXT",open("new.txt","rb").read()) # add a file, directory automatically created
f.add_dir_path("NEWDIR") # creates a new directory
f.add_all("add\\","ADDED\\") # adds all files from a local directory to to a specified floppy directory.
f.set_volume_id() # generates a new volume ID
f.set_volume_label("MY DISK") # changes the volume label
f.save("new.img")
"""
EMPTY = 0xE5 # incipit value for an empty directory entry
def _filestring(s, length):
"""Creates an ASCII string, padded to length with spaces ($20)."""
b = bytearray(s.encode("ASCII"))
b = b + bytearray([0x20] * (length - len(b)))
if len(b) != length:
raise self.Error("File string '%s' too long? (%d != %d)" % (s,len(b),length))
return b
class Error(Exception):
"""Floppy has had an error."""
pass
class FileEntry:
"""A directory entry for a file."""
def __init__(self, data=None, dir_cluster=-1, dir_index=-1):
"""Unpacks a 32 byte directory entry into a FileEntry structure."""
if data == None:
data = bytearray([Floppy.EMPTY]+([0]*31))
self.data = bytearray(data)
self.path = ""
self.incipit = data[0]
if self.incipit != 0x00 and self.incipit != Floppy.EMPTY:
filename = data[0:8].decode("cp866")
filename = filename.rstrip(" ")
extension = data[8:11].decode("cp866")
extension = extension.rstrip(" ")
self.path = filename
if len(extension) > 0:
self.path = self.path + "." + extension
block = struct.unpack("<BHHHHHHHHL",data[11:32])
self.attributes = block[0]
self.write_time = block[6]
self.write_date = block[7]
self.cluster = block[8]
self.size = block[9]
self.dir_cluster = dir_cluster
self.dir_index = dir_index
def compile(self):
"""Commits any changed data to the entry, rebuilds and returns the 32 byte structure."""
filename = ""
extension = ""
period = self.path.find(".")
if (period >= 0):
filename = self.path[0:period]
extension = self.path[period+1:]
else:
filename = self.path
if self.incipit != 0x00 and self.incipit != 0xEF:
self.data[0:8] = Floppy._filestring(filename,8)
self.data[8:11] = Floppy._filestring(extension,3)
else:
self.data[0] = self.incipit
self.data[11] = self.attributes
self.data[22:32] = bytearray(struct.pack("<HHHL",
self.write_time,
self.write_date,
self.cluster,
self.size))
return bytearray(self.data)
def info(self):
"""String of information about a FileEntry."""
s = ""
s += "Path: [%s]\n" % self.path
s += "Incipit: %02X\n" % self.incipit
s += "Attributes: %02X\n" % self.attributes
s += "Write: %04X %04X\n" % (self.write_date, self.write_time)
s += "Cluster: %03X\n" % self.cluster
s += "Size: %d bytes\n" % self.size
s += "Directory: %03X, %d\n" % (self.dir_cluster, self.dir_index)
return s
def fat_time(year, month, day, hour, minute, second):
"""Builds a FAT12 date/time entry."""
date = ((year - 1980) << 9) | ((month) << 5) | day
time = (hour << 11) | (minute << 5) | (second >> 1)
return (date, time)
def fat_time_now():
"""Builds the current time as a FAT12 date/time entry."""
now = datetime.datetime.now()
return Floppy.FileEntry.fat_time(now.year, now.month, now.day, now.hour, now.minute, now.second)
def set_name(self,s):
"""Sets the filename and updates incipit."""
self.path = s
self.incipit = Floppy._filestring(s,12)[0]
def set_now(self):
"""Updates modified date/time to now."""
(date,time) = Floppy.FileEntry.fat_time_now()
self.write_date = date
self.write_time = time
def new_file(name):
"""Generate a new file entry."""
e = Floppy.FileEntry()
e.set_name(name)
e.set_now()
e.attributes = 0x00
return e
def new_dir(name):
"""Generate a new subdirectory entry."""
e = Floppy.FileEntry()
e.set_name(name)
e.set_now()
e.attributes = 0x10
return e
def new_volume(name):
"""Generate a new volume entry."""
e = Floppy.FileEntry()
if len(name) > 8:
name = name[0:8] + "." + name[8:]
e.set_name(name)
e.set_now()
e.attributes = 0x08
return e
def new_terminal():
"""Generate a new directory terminating entry."""
e = Floppy.FileEntry()
e.incipit = 0x00
return e
# blank formatted 1.44MB MS-DOS 5.0 non-system floppy
blank_floppy = [ # boot sector, 2xFAT, "BLANK" volume label, F6 fill
0xEB,0x3C,0x90,0x4D,0x53,0x44,0x4F,0x53,0x35,0x2E,0x30,0x00,0x02,0x01,0x01,0x00,
0x02,0xE0,0x00,0x40,0x0B,0xF0,0x09,0x00,0x12,0x00,0x02,0x00,0x00,0x00,0x00,0x00,
0x00,0x00,0x00,0x00,0x00,0x00,0x29,0xE3,0x16,0x53,0x1B,0x42,0x4C,0x41,0x4E,0x4B,
0x20,0x20,0x20,0x20,0x20,0x20,0x46,0x41,0x54,0x31,0x32,0x20,0x20,0x20,0xFA,0x33,
0xC0,0x8E,0xD0,0xBC,0x00,0x7C,0x16,0x07,0xBB,0x78,0x00,0x36,0xC5,0x37,0x1E,0x56,
0x16,0x53,0xBF,0x3E,0x7C,0xB9,0x0B,0x00,0xFC,0xF3,0xA4,0x06,0x1F,0xC6,0x45,0xFE,
0x0F,0x8B,0x0E,0x18,0x7C,0x88,0x4D,0xF9,0x89,0x47,0x02,0xC7,0x07,0x3E,0x7C,0xFB,
0xCD,0x13,0x72,0x79,0x33,0xC0,0x39,0x06,0x13,0x7C,0x74,0x08,0x8B,0x0E,0x13,0x7C,
0x89,0x0E,0x20,0x7C,0xA0,0x10,0x7C,0xF7,0x26,0x16,0x7C,0x03,0x06,0x1C,0x7C,0x13,
0x16,0x1E,0x7C,0x03,0x06,0x0E,0x7C,0x83,0xD2,0x00,0xA3,0x50,0x7C,0x89,0x16,0x52,
0x7C,0xA3,0x49,0x7C,0x89,0x16,0x4B,0x7C,0xB8,0x20,0x00,0xF7,0x26,0x11,0x7C,0x8B,
0x1E,0x0B,0x7C,0x03,0xC3,0x48,0xF7,0xF3,0x01,0x06,0x49,0x7C,0x83,0x16,0x4B,0x7C,
0x00,0xBB,0x00,0x05,0x8B,0x16,0x52,0x7C,0xA1,0x50,0x7C,0xE8,0x92,0x00,0x72,0x1D,
0xB0,0x01,0xE8,0xAC,0x00,0x72,0x16,0x8B,0xFB,0xB9,0x0B,0x00,0xBE,0xE6,0x7D,0xF3,
0xA6,0x75,0x0A,0x8D,0x7F,0x20,0xB9,0x0B,0x00,0xF3,0xA6,0x74,0x18,0xBE,0x9E,0x7D,
0xE8,0x5F,0x00,0x33,0xC0,0xCD,0x16,0x5E,0x1F,0x8F,0x04,0x8F,0x44,0x02,0xCD,0x19,
0x58,0x58,0x58,0xEB,0xE8,0x8B,0x47,0x1A,0x48,0x48,0x8A,0x1E,0x0D,0x7C,0x32,0xFF,
0xF7,0xE3,0x03,0x06,0x49,0x7C,0x13,0x16,0x4B,0x7C,0xBB,0x00,0x07,0xB9,0x03,0x00,
0x50,0x52,0x51,0xE8,0x3A,0x00,0x72,0xD8,0xB0,0x01,0xE8,0x54,0x00,0x59,0x5A,0x58,
0x72,0xBB,0x05,0x01,0x00,0x83,0xD2,0x00,0x03,0x1E,0x0B,0x7C,0xE2,0xE2,0x8A,0x2E,
0x15,0x7C,0x8A,0x16,0x24,0x7C,0x8B,0x1E,0x49,0x7C,0xA1,0x4B,0x7C,0xEA,0x00,0x00,
0x70,0x00,0xAC,0x0A,0xC0,0x74,0x29,0xB4,0x0E,0xBB,0x07,0x00,0xCD,0x10,0xEB,0xF2,
0x3B,0x16,0x18,0x7C,0x73,0x19,0xF7,0x36,0x18,0x7C,0xFE,0xC2,0x88,0x16,0x4F,0x7C,
0x33,0xD2,0xF7,0x36,0x1A,0x7C,0x88,0x16,0x25,0x7C,0xA3,0x4D,0x7C,0xF8,0xC3,0xF9,
0xC3,0xB4,0x02,0x8B,0x16,0x4D,0x7C,0xB1,0x06,0xD2,0xE6,0x0A,0x36,0x4F,0x7C,0x8B,
0xCA,0x86,0xE9,0x8A,0x16,0x24,0x7C,0x8A,0x36,0x25,0x7C,0xCD,0x13,0xC3,0x0D,0x0A,
0x4E,0x6F,0x6E,0x2D,0x53,0x79,0x73,0x74,0x65,0x6D,0x20,0x64,0x69,0x73,0x6B,0x20,
0x6F,0x72,0x20,0x64,0x69,0x73,0x6B,0x20,0x65,0x72,0x72,0x6F,0x72,0x0D,0x0A,0x52,
0x65,0x70,0x6C,0x61,0x63,0x65,0x20,0x61,0x6E,0x64,0x20,0x70,0x72,0x65,0x73,0x73,
0x20,0x61,0x6E,0x79,0x20,0x6B,0x65,0x79,0x20,0x77,0x68,0x65,0x6E,0x20,0x72,0x65,
0x61,0x64,0x79,0x0D,0x0A,0x00,0x49,0x4F,0x20,0x20,0x20,0x20,0x20,0x20,0x53,0x59,
0x53,0x4D,0x53,0x44,0x4F,0x53,0x20,0x20,0x20,0x53,0x59,0x53,0x00,0x00,0x55,0xAA,
] + \
[0xF0,0xFF,0xFF] + ([0]*(0x1200-3)) + \
[0xF0,0xFF,0xFF] + ([0]*(0x1200-3)) + [
0x42,0x4C,0x41,0x4E,0x4B,0x20,0x20,0x20,0x20,0x20,0x20,0x28,0x00,0x00,0x00,0x00,
0x00,0x00,0x00,0x00,0x00,0x00,0xEE,0x7C,0x24,0x00,0x00,0x00,0x00,0x00,0x00,0x00,
] + ([0] * (0x4200-0x2620)) + \
([0xF6] * (0x168000-0x4200))
def __init__(self,data=blank_floppy):
"""Create Floppy() instance from image bytes, or pre-formatted blank DOS floppy by default."""
self.data = bytearray(data)
self._boot_open()
self._fat_open()
def _boot_open(self):
"""Parses the boot sector."""
if (len(self.data) < 38):
raise self.Error("Not enough data in image for boot sector. (%d bytes)" % len(data))
boot = struct.unpack("<HBHBHHBHHH",self.data[11:28])
self.sector_size = boot[0]
self.cluster_sects = boot[1]
self.reserved_sects = boot[2]
self.fat_count = boot[3]
self.root_max = boot[4]
self.sectors = boot[5]
self.fat_sects = boot[7]
self.track_sects = boot[8]
self.heads = boot[9]
self.volume_id = 0
self.volume_label = ""
if self.data[38] == 0x29 and len(self.data) >= 54:
self.volume_id = struct.unpack("<L",self.data[39:43])[0]
self.volume_label = self.data[43:54].decode("ASCII").rstrip(" ")
if (self.sectors * self.sector_size) > len(self.data):
raise self.Error("Not enough data to contain %d x %d byte sectors? (%d bytes)" %
(self.sectors, self.sector_size, len(self.data)))
self.root = self.sector_size * (self.reserved_sects + (self.fat_count * self.fat_sects))
root_sectors = ((self.root_max * 32) + (self.sector_size-1)) // self.sector_size # round up to fill sector
self.cluster2 = self.root + (self.sector_size * root_sectors)
def _boot_flush(self):
"""Commits changes to the boot sector."""
boot = struct.pack("<HBHBHHBHHH",
self.sector_size,
self.cluster_sects,
self.reserved_sects,
self.fat_count,
self.root_max,
self.sectors,
self.data[21],
self.fat_sects,
self.track_sects,
self.heads)
self.data[11:28] = bytearray(boot)
if self.data[38] == 0x29 and len(self.data) >= 54:
self.data[39:43] = bytearray(struct.pack("<L",self.volume_id))
self.data[43:54] = Floppy._filestring(self.volume_label,11)
def boot_info(self):
"""String of information about the boot sector."""
s = ""
s += "Volume Label: [%s]\n" % self.volume_label
s += "Volume ID: %04X\n" % self.volume_id
s += "Sector size: %d bytes\n" % self.sector_size
s += "Cluster size: %d sectors\n" % self.cluster_sects
s += "Reserved sectors: %d\n" % self.reserved_sects
s += "Number of FATs: %d\n" % self.fat_count
s += "Maximum root entries: %d\n" % self.root_max
s += "Total sectors: %d\n" % self.sectors
s += "FAT size: %d sectors\n" % self.fat_sects
s += "Track size: %d sectors\n" % self.track_sects
s += "Heads: %d\n" % self.heads
s += "Root directory: %08X\n" % self.root
s += "Cluster 2: %08X\n" % self.cluster2
return s
def _fat_open(self):
"""Parses the FAT table."""
fat_start = self.reserved_sects * self.sector_size
fat_sects = self.fat_sects * self.sector_size
fat_end = fat_start + fat_sects
# make sure they're in the image
if (self.sectors < (self.reserved_sects + (self.fat_count * self.fat_sects))):
raise self.Error("Not enough sectors to contain %d + %d x %d FAT tables? (%d sectors)" %
(self.reserved_sects, self.fat_count, self.fat_sects, self.sectors))
if (self.fat_count < 1) or (self.fat_sects < 1):
raise self.Error("No FAT tables? (%d x %d FAT sectors)" %
(self.fat_count, self.fat_sects))
# verify FAT tables match
for i in range(1,self.fat_count):
fat2_start = fat_start + (fat_sects * i)
fat2_end = fat2_start + fat_sects
if self.data[fat_start:fat_end] != self.data[fat2_start:fat2_end]:
raise self.Error("FAT mismatch in table %d." % i)
# read FAT 0
self.fat = []
e = fat_start
while (e+2) <= fat_end:
entry = 0
if (len(self.fat) & 1) == 0:
entry = self.data[e+0] | ((self.data[e+1] & 0x0F) << 8)
e += 1
else:
entry = ((self.data[e+0] & 0xF0) >> 4) | (self.data[e+1] << 4)
e += 2
self.fat.append(entry)
def _fat_flush(self):
"""Commits changes to the FAT table."""
fat_start = self.reserved_sects * self.sector_size
fat_sects = self.fat_sects * self.sector_size
fat_end = fat_start + fat_sects
# build FAT 0
e = self.reserved_sects * self.sector_size
for i in range(len(self.fat)):
entry = self.fat[i]
if (i & 1) == 0:
self.data[e+0] = entry & 0xFF
self.data[e+1] = (self.data[e+1] & 0xF0) | ((entry >> 8) & 0x0F)
e += 1
else:
self.data[e+0] = (self.data[e+0] & 0x0F) | ((entry << 4) & 0xF0)
self.data[e+1] = (entry >> 4) & 0xFF
e += 2
# copy to all tables
for i in range(1,self.fat_count):
fat2_start = fat_start + (fat_sects * i)
fat2_end = fat2_start + fat_sects
self.data[fat2_start:fat2_end] = self.data[fat_start:fat_end]
def fat_info(self):
"""String of information about the FAT."""
per_line = 32
s = ""
for i in range(len(self.fat)):
if (i % per_line) == 0:
s += "%03X: " % i
if (i % per_line) == (per_line // 2):
s += " " # extra space to mark 16
s += " %03X" % self.fat[i]
if (i % per_line) == (per_line - 1):
s += "\n"
return s
def _cluster_offset(self, cluster):
"""Image offset of a FAT indexed logical cluster."""
if cluster < 2:
return self.root
return self.cluster2 + (self.sector_size * self.cluster_sects * (cluster-2))
def _read_chain(self, cluster, size):
data = bytearray()
"""Returns up to size bytes from a chain of clusters starting at cluster."""
while cluster < 0xFF0:
offset = self._cluster_offset(cluster)
#print("read_chain(%03X,%d) at %08X" % (cluster,size,offset))
if cluster < 2:
return data + bytearray(self.data[self.root:self.root+size]) # root directory
read = min(size,(self.sector_size * self.cluster_sects))
data = data + bytearray(self.data[offset:offset+read])
size -= read
if (size < 1):
return data
cluster = self.fat[cluster]
return data
def _read_dir_chain(self, cluster):
"""Reads an entire directory chain starting at the given cluster (0 for root)."""
if cluster < 2:
# root directory is contiguous
return self.data[self.root:self.root+(self.root_max*32)]
# directories just occupy as many clusters as in their FAT chain, using a dummy max size
return self._read_chain(cluster, self.sector_size*self.sectors//self.cluster_sects)
def _delete_chain(self, cluster):
"""Deletes a FAT chain."""
while cluster < 0xFF0 and cluster >= 2:
link = self.fat[cluster]
self.fat[cluster] = 0
cluster = link
def _add_chain(self,data):
"""Adds a block of data to the disk and creates its FAT chain. Returns start cluster, or -1 for failure."""
cluster_size = self.sector_size * self.cluster_sects
clusters = (len(data) + cluster_size-1) // cluster_size
if clusters < 1:
clusters = 1
# find a chain of free clusters
chain = []
for i in range(2,len(self.fat)):
if self.fat[i] == 0:
chain.append(i)
if len(chain) >= clusters:
break
if len(chain) < clusters:
return -1 # out of space
# store the FAT chain
start_cluster = chain[0]
for i in range(0,len(chain)-1):
self.fat[chain[i]] = chain[i+1]
self.fat[chain[len(chain)-1]] = 0xFFF
# store the data in the given clusters
data = bytearray(data)
c = 0
while len(data) > 0:
write = min(len(data),cluster_size)
offset = self._cluster_offset(chain[c])
self.data[offset:offset+write] = data[0:write]
c += 1
data = data[write:]
# done
return start_cluster
def _files_dir(self, cluster, path):
"""Returns a list of files in the directory starting at the given cluster. Recursive."""
#print("files_dir(%03X,'%s')" % (cluster, path))
entries = []
directory = self._read_dir_chain(cluster)
for i in range(len(directory) // 32):
e = self.FileEntry(directory[(i*32):(i*32)+32],cluster,i)
#print(("entry %d (%08X)\n"%(i,self._dir_entry_offset(cluster,i)))+e.info())
if e.incipit == 0x00: # end of directory
return entries
if e.incipit == Floppy.EMPTY: # empty
continue
if (e.attributes & 0x08) != 0: # volume label
continue
if (e.attributes & 0x10) != 0: # subdirectory
if e.path == "." or e.path == "..":
continue
subdir = path + e.path + "\\"
entries.append(subdir)
entries = entries + self._files_dir(e.cluster,subdir)
else:
entries.append(path + e.path)
return entries
def files(self):
"""Returns a list of files in the image."""
root = self.sector_size * (self.reserved_sects + (self.fat_count * self.fat_sects))
return self._files_dir(0,"")
def files_info(self):
"""String of the file list."""
s = ""
for path in self.files():
s += path + "\n"
return s
def _dir_entry_offset(self,cluster,dir_index):
"""Find the offset in self.data to a particular directory entry."""
if (cluster < 2):
return self.root + (32 * dir_index)
if (cluster >= 0xFF0):
raise self.Error("Directory entry %d not in its cluster chain?" % dir_index)
per_cluster = (self.sector_size*self.cluster_sects)//32
if (dir_index < per_cluster): # within this cluster
return self._cluster_offset(cluster) + (32 * dir_index)
# continue to next cluster
return self._dir_entry_offset(self.fat[cluster],dir_index-per_cluster)
def delete_file(self, entry):
"""Deletes a FileEntry."""
self._delete_chain(entry.cluster) # delete its FAT chain
offset = self._dir_entry_offset(entry.dir_cluster,entry.dir_index)
self.data[offset+0] = Floppy.EMPTY # empty this entry
def _find_path_dir(self, cluster, path):
"""Recursive find path, breaking out subdirectories progressively."""
#print("_find_path_dir(%03X,'%s')" % (cluster,path))
separator = path.find("\\")
path_seek = path
path_next = ""
if separator >= 0:
path_seek = path[0:separator]
path_next = path[separator+1:]
directory = self._read_dir_chain(cluster)
for i in range(len(directory) // 32):
e = self.FileEntry(directory[(i*32):(i*32)+32],cluster,i)
if e.incipit == 0x00: # end of directory
return None
if e.incipit == Floppy.EMPTY: # empty
continue
if (e.attributes & 0x08) != 0: # volume label
continue
if (e.attributes & 0x10) != 0: # subdirectory
if e.path == "." or e.path == "..":
continue
if e.path == path_seek:
if (len(path_next) > 0):
return self._find_path_dir(e.cluster, path_next)
else:
return e
elif e.path == path_seek and path_next == "":
return e
return None
def find_path(self, path):
"""Finds a FileEntry for a given path."""
return self._find_path_dir(0,path)
def _delete_tree(self,de):
"""Recursively deletes directory entries."""
#print("_delete_tree\n" + de.info())
directory = self._read_dir_chain(de.cluster)
for i in range(len(directory) // 32):
e = self.FileEntry(directory[(i*32):(i*32)+32],de.cluster,i)
#print(e.info())
if e.incipit == 0x00: # end of directory
return
if e.incipit == Floppy.EMPTY: # empty
continue
if (e.attributes & 0x08) != 0: # volume label
continue
if (e.attributes & 0x10) != 0: # subdirectory
if e.path == "." or e.path == "..":
continue
self._delete_tree(e) # recurse
self.delete_file(e)
else:
self.delete_file(e)
def delete_path(self, path):
"""Finds a file or directory and deletes it (recursive), if it exists. Returns True if successful."""
e = self.find_path(path)
if (e == None):
return False
if (e.attributes & 0x10) != 0:
self._delete_tree(e)
self.delete_file(e)
return True
def _add_entry(self, dir_cluster, entry):
"""
Adds an entry to a directory starting at the given cluster, appending a new cluster if needed.
Sets entry.dir_cluster and entry.dir_index to match their new directory.
Returns False if out of space.
"""
#print(("_add_entry(%d):\n"%dir_cluster) + entry.info())
directory = self._read_dir_chain(dir_cluster)
dir_len = len(directory)//32
i = 0
terminal = False
while i < dir_len:
e = self.FileEntry(directory[(i*32):(i*32)+32],dir_cluster,i)
#print(e.info())
if e.incipit == 0x00:
terminal = True # make sure to add another terminal entry after this one
break
if e.incipit == Floppy.EMPTY:
break
i += 1
# extend directory if out of room
if i >= dir_len:
if dir_cluster < 2:
return False # no room in root
# add a zero-filled page to the end of this directory's FAT chain
chain = self._add_chain(bytearray([0]*(self.sector_size*self.cluster_sects)))
if (chain < 0):
return False # no free clusters
tail = dir_cluster
while self.fat[tail] < 0xFF0:
tail = self.fat[tail]
self.fat[tail] = chain
self.fat[chain] = 0xFFF
# insert entry
entry.dir_cluster = dir_cluster
entry.dir_index = i
offset = self._dir_entry_offset(dir_cluster,i)
self.data[offset:offset+32] = entry.compile()
# add a new terminal if needed
if terminal:
i += 1
if i < dir_len: # if it was the last entry, no new terminal is needed
offset = self._dir_entry_offset(dir_cluster,i)
self.data[offset:offset+32] = Floppy.FileEntry.new_terminal().compile()
# success!
return True
def _add_dir_recursive(self, cluster, path):
"""Recursively creates directory, returns cluster of created dir, -1 if failed."""
#print("_add_dir_recursive(%03X,'%s')"%(cluster,path))
separator = path.find("\\")
path_seek = path
path_next = ""
if separator >= 0:
path_seek = path[0:separator]
path_next = path[separator+1:]
directory = self._read_dir_chain(cluster)
for i in range(len(directory) // 32):
e = self.FileEntry(directory[(i*32):(i*32)+32],cluster,i)
#print(e.info())
if e.incipit == 0x00: # end of directory
break
if e.incipit == Floppy.EMPTY: # empty
continue
if (e.attributes & 0x10) != 0: # subdirectory
if e.path == path_seek: # already exists
if len(path_next) < 1:
return e.cluster # return existing directory
else:
return self._add_dir_recursive(e.cluster,path_next) # keep descending
# not found: create the directory
dp0 = Floppy.FileEntry.new_dir("_") # .
dp1 = Floppy.FileEntry.new_dir("__") # ..
dp1.cluster = cluster
dir_block = dp0.compile() + dp1.compile() + bytearray([0] * ((self.sector_size*self.cluster_sects)-64))
new_cluster = self._add_chain(dir_block)
if new_cluster < 0:
return -1 # out of space
# fix up "." to point to itself
dp0.cluster = new_cluster
offset = self._dir_entry_offset(new_cluster,0)
self.data[offset:offset+32] = dp0.compile()
# fix special directory names (FileEntry.compile() is incapable of . or ..)
self.data[offset+ 0:offset+11] = bytearray(". ".encode("ASCII"))
self.data[offset+32:offset+43] = bytearray(".. ".encode("ASCII"))
# create entry to point to new directory cluster
new_dir = Floppy.FileEntry.new_dir(path_seek)
new_dir.cluster = new_cluster
if not self._add_entry(cluster, new_dir):
self._delete_chain(new_cluster)
return -1 # out of space
# return the entry if tail is reached, or keep descending
if len(path_next) < 1:
return new_cluster
else:
return self._add_dir_recursive(new_cluster,path_next)
def add_dir_path(self, path):
"""
Recursively ensures that the given directory path exists, creating it if necessary.
Path should not end in backslash.
Returns cluster of directory at path, or -1 if failed.
"""
if (len(path) < 1):
return 0 # root
return self._add_dir_recursive(0,path)
def add_file_path(self, path, data):
"""
Adds the given data as a file at the given path.
Will automatically create directories to complete the path.
Returns False if failed.
"""
self.delete_path(path) # remove file if it already exists
dir_path = ""
file_path = path
separator = path.rfind("\\")
if (separator >= 0):
dir_path = path[0:separator]
file_path = path[separator+1:]
dir_cluster = self.add_dir_path(dir_path)
if dir_cluster < 0:
return False # couldn't find or create directory
cluster = self._add_chain(data)
if (cluster < 0):
return False # out of space
entry = Floppy.FileEntry.new_file(file_path)
entry.cluster = cluster
entry.size = len(data)
if not self._add_entry(dir_cluster, entry):
self._delete_chain(cluster)
return False # out of space for directory entry
offset = self._dir_entry_offset(entry.dir_cluster, entry.dir_index)
self.data[offset:offset+32] = entry.compile()
return True
def extract_file_path(self, path):
"""Finds a file and returns all of its data. None on failure."""
e = self.find_path(path)
if e == None:
return None
return self._read_chain(e.cluster, e.size)
def set_volume_id(self, value=None):
"""Sets the volume ID. None for time-based."""
if (value == None):
(date,time) = Floppy.FileEntry.fat_time_now()
value = time | (date << 16)
self.volume_id = value & 0xFFFFFFFF
def set_volume_label(self, label):
"""Sets the volume label. Creates one if necessary."""
self.data[38] = 0x29
self.volume_label = label
self.data[54:62] = Floppy._filestring("FAT12",8)
# adjust existing volume entry in root
directory = self._read_dir_chain(0)
for i in range(len(directory) // 32):
e = self.FileEntry(directory[(i*32):(i*32)+32],0,i)
if e.incipit == 0x00: # end of directory
break
if e.incipit == Floppy.EMPTY: # empty
continue
if (e.attributes & 0x08) != 0: # existing volume label
offset = self._dir_entry_offset(0,i)
self.data[offset:offset+11] = Floppy._filestring(label,11)
return
# volume entry does not exist in root, add one
self._add_entry(0,Floppy.FileEntry.new_volume(label))
return
def open(filename):
"""Opens a floppy image file and creates a Floppy() instance from it."""
return Floppy(open(filename,"rb").read())
def flush(self):
"""Commits all unfinished changes to self.data image."""
self._fat_flush()
self._boot_flush()
def save(self, filename):
"""Saves image (self.data) to file. Implies flush()."""
self.flush()
open(filename,"wb").write(self.data)
def extract_all(self, out_directory):
"""Extracts all files from image to specified directory."""
for in_path in self.files():
out_path = os.path.join(out_directory,in_path)
out_dir = os.path.dirname(out_path)
if not os.path.exists(out_dir):
try:
os.makedirs(out_dir)
except OSError as e:
if e.errono != errno.EEXIST:
raise
if not in_path.endswith("\\"):
open(out_path,"wb").write(self.extract_file_path(in_path))
print(out_path)
else:
print(out_path)
def add_all(self, in_directory, prefix=""):
"""
Adds all files from specified directory to image.
Files will be uppercased. Long filenames are not checked and will cause an exception.
prefix can be used to prefix a directory path (ending with \) to the added files.
"""
result = True
def dospath(s):
s = s.upper()
s = s.replace("/","\\")
return s
if len(in_directory) < 1:
in_directory = "."
in_directory = os.path.normpath(in_directory) + os.sep
for (root, dirs, files) in os.walk(in_directory):
base = root[len(in_directory):]
for d in dirs:
dir_path = prefix + dospath(os.path.join(base,d))
result = result and (self.add_dir_path(dir_path) >= 0)
print(dir_path + "\\")
for f in files:
file_path = prefix + dospath(os.path.join(base,f))
data = open(os.path.join(root,f),"rb").read()
result = result and self.add_file_path(file_path,data)
print(file_path + " (%d bytes)" % len(data))
return result

View File

@ -0,0 +1,153 @@
# Copyright Magomed Kostoev
# Published under MIT license
class Rule:
pass
def get_config(config_name):
if config_name == "KPACK_CMD":
# Just never pack the file for now
return ""
else:
print(f"Unknown config name: {config_name}")
exit(-1)
def skip_whitespaces(src, ptr):
while len(src) > ptr and src[ptr] == " ":
ptr += 1
return ptr
# Returns True if @src has @string starting from @ptr index
def match_string(src, ptr, string):
if len(src) <= ptr + len(string):
return False
for i in range(len(string)):
if src[ptr + i] != string[i]:
return False
return True
def parse_tup_getconfig(src, ptr):
# Skip get straight to the argument
ptr += len("tup.getconfig(")
ptr = skip_whitespaces(src, ptr)
if src[ptr] != "\"":
print("Expected \"config name\" as tup.getconfig parameter")
exit()
(config_name, ptr) = parse_string(src, ptr)
ptr = skip_whitespaces(src, ptr)
# Skip closing parenthese of the tup.getconfig call
assert(src[ptr] == ")")
ptr += 1
return (get_config(config_name), ptr)
def parse_string(src, ptr):
ptr += 1
string = ""
while src[ptr] != "\"":
string += src[ptr]
ptr += 1
# Skip the closing "\""
ptr += 1
ptr = skip_whitespaces(src, ptr)
# Check if we have concatination here
if match_string(src, ptr, ".."):
# Skip the ".."
ptr += 2
# The expression parsing should result in a string
(string_to_add, ptr) = parse_expression(src, ptr)
# Concat our string to the resulting string
string += string_to_add
return (string, ptr)
def parse_expression(src, ptr):
ptr = skip_whitespaces(src, ptr)
result = "WAT?!"
if src[ptr] == "\"":
(result, ptr) = parse_string(src, ptr)
elif match_string(src, ptr, "tup.getconfig("):
(result, ptr) = parse_tup_getconfig(src, ptr)
else:
print(f"Can't handle anything starting with '{src[ptr]}'")
exit(-1)
ptr = skip_whitespaces(src, ptr)
return (result, ptr)
def expect_comma(src, ptr):
comma_skept = False
ptr = skip_whitespaces(src, ptr)
if src[ptr] == ",":
ptr += 1
return (True, ptr)
else:
return (False, ptr)
def parse_arguments(src, ptr):
result = []
# Parse first argument
(argument, ptr) = parse_expression(src, ptr)
result.append(argument)
(comma_encoutered, ptr) = expect_comma(src, ptr)
# Parse the second argument if it's there
if comma_encoutered:
(argument, ptr) = parse_expression(src, ptr)
result.append(argument)
(comma_encoutered, ptr) = expect_comma(src, ptr)
# Parse third argument if it's there
if comma_encoutered:
(argument, ptr) = parse_expression(src, ptr)
result.append(argument)
return result
def parse_rule(src, ptr):
# Get straight to the first argument
ptr += len("tup.rule(")
# Parse the arguments
args = parse_arguments(src, ptr)
# Build the rule object
result = Rule()
if len(args) == 3:
result.input = args[0]
result.command = args[1]
result.output = args[2]
# Replace %f with input file in rule's command
if type(result.input == str):
result.command = result.command.replace("%f", result.input)
else:
print("Command building with non-string tup.rule's first argument"
+ " isn't implemented")
exit()
# Replace %o with output file in rule's command
if type(result.output == str):
result.command = result.command.replace("%o", result.output)
else:
print("Command building with non-string tup.rule's first argument"
+ " isn't implemented")
exit()
elif len(args) == 2:
result.input = []
result.command = args[0]
result.output = args[1]
else:
print(f"tup.rule can only take 2 or 3 arguments, not {len(args)}")
exit(-1)
# Unify the API - return arrays as input and output
if type(result.input) == str:
result.input = [ result.input ]
else:
assert(type(result.input) == list)
if type(result.output) == str:
result.output = [ result.output ]
else:
assert(type(result.output) == list)
return result
def parse(file_name):
rules = []
with open(file_name) as f:
tupfile = f.read()
rule_begin_index = tupfile.find("tup.rule(")
while (rule_begin_index != -1):
rules.append(parse_rule(tupfile, rule_begin_index))
# Find the next tup.rule call
rule_begin_index = tupfile.find("tup.rule(", rule_begin_index + len("tup.rule("))
return rules

24
_tools/workspace/build.py Normal file
View File

@ -0,0 +1,24 @@
import sys
import os
path_to_lib = '../lib'
sys.path.append(path_to_lib)
import tupfile_parser
def build():
if not os.path.exists("Tupfile.lua"):
print("No Tupfile.lua, can't build anything")
exit()
tup_rules = tupfile_parser.parse("Tupfile.lua")
program_files = []
for rule in tup_rules:
# TODO: Manage source dependencies
# TODO: Inform about tools required for the build
os.system(rule.command)
program_files += rule.output
return program_files
if __name__ == "__main__":
build()

103
_tools/workspace/run.py Normal file
View File

@ -0,0 +1,103 @@
import os
import sys
import shutil
import urllib.request
import subprocess
import build
path_to_lib = '../lib'
sys.path.append(path_to_lib)
from makeflop import Floppy
# TODO: Move into _tools/lib
def get_file_directory(path):
path = path.replace("\\", "/")
if "/" in path:
folder = "/".join(path.split("/")[:-1])
if folder == "":
return "/" # It was a file in the root folder
return folder
else:
return "." # Just a filename, let's return current folder
# TODO: Move into _tools/lib
def is_win32():
return True if sys.platform == "win32" else False
# TODO: Move into _tools/lib
def is_linux():
return True if sys.platform == "linux" or sys.platform == "linux2" else False
# TODO: Move into _tools/lib
def is_osx():
return True if sys.platform == "darwin" else False
# TODO: Move into _tools/lib
def log(s, end = "\n"):
print(s, end = end, flush = True)
# TODO: Move into _tools/lib
def download(link, path):
log(f"Downloading {path}... ", end = "")
urllib.request.urlretrieve(link, path)
log("Done.")
# TODO: Move into _tools/lib
def path(*args):
return os.sep.join(args)
# TODO: Move into _tools/lib
def run_qemu(start_dir = "workspace"):
qemu_command = f"qemu-system-i386"
flags = ""
flags += "-L . " # IDK why it does not work without this
flags += "-m 128 "
flags += f"-drive format=raw,file={start_dir}/kolibri.img,index=0,if=floppy -boot a "
flags += "-vga vmware "
flags += "-net nic,model=rtl8139 -net user "
flags += "-soundhw ac97 "
if is_win32():
qemu_full_path = shutil.which(qemu_command)
qemu_directory = get_file_directory(qemu_full_path)
flags += f"-L {qemu_directory} "
s = f"{qemu_command} {flags}"
qemu_stdout = open(f"{start_dir}/qemu_stdout.log", "w")
qemu_stderr = open(f"{start_dir}/qemu_stderr.log", "w")
if is_win32():
return subprocess.Popen(s, bufsize = 0, stdout = qemu_stdout, stderr = qemu_stderr, stdin = subprocess.DEVNULL, shell = True, start_new_session = True)
else:
a = shlex.split(s)
return subprocess.Popen(a, bufsize = 0, stdout = qemu_stdout, stderr = qemu_stderr, stdin = subprocess.DEVNULL, start_new_session = True)
if __name__ == "__main__":
program_files = build.build()
os.makedirs("workspace", exist_ok = True)
if not os.path.exists("workspace/kolibri.img"):
img_url = "http://builds.kolibrios.org/eng/data/data/kolibri.img"
download(img_url, "workspace/kolibri.img")
# Open the IMG
with open("workspace/kolibri.img", "rb") as img:
img_data = img.read()
img = Floppy(img_data)
# Remove unuseful folders
img.delete_path("GAMES")
img.delete_path("DEMOS")
img.delete_path("3D")
log("Moving program files into kolibri image... ", end = "")
for file_name in program_files:
with open(file_name, "rb") as file:
file_data = file.read()
if not img.add_file_path(file_name, file_data):
print(f"Coudn't move {file_name} into IMG")
img.save("workspace/kolibri.img")
log("Done")
# TODO: Autorun
run_qemu()